Attention-Enhanced Deep Reinforcement Learning for Autonomous Driving¶
Note to the examiner. Due to GitHub file size limitations, this notebook does not contain embedded videos showing the performance of each trained agent in the different environments. For the most immersive experience, please clone the repository and run this notebook. Running the notebook should only take 1 to 2 minutes. Otherwise, please refer to the Final.html file as it contains plots which might not render in the Final.ipynb notebook. To run the notebook, please ensure that the following packages are installed:
# !pip install highway-env
# !pip install keras --upgrade
# !pip install gymnasium
# !pip install opencv-python
# !pip install plotly
# !pip install ipython
# !pip install pandas
# !pip install numpy
# !pip install scipy
# !pip install tensorflow
Abstract¶
Recent advancements in autonomous driving technology have significantly accelerated the exploration of safe and efficient vehicular operation. A critical element in the development of self-driving vehicles is addressing the intricate decision making challenges inherent in autonomous navigation. In this project, we specifically target these challenges by investigating the efficacy of various deep reinforcement learning models—namely, the Deep Q-Network (DQN), Asynchronous Advantage Actor-Critic (A3C), and Proximal Policy Optimisation (PPO)—across a series of automated driving tasks. Additionally, we integrate an attention-based framework to enhance the neural network architecture, which optimises our models' ability to process complex environments. Our findings indicate that policy-based algorithms consistently outperform value-based ones in terms of safety and efficiency during training phases.
1. Introduction¶
The US Department of Transportation reports that human error is responsible for approximately 94% of car crashes [1]. In response, advancements in artificial intelligence have significantly accelerated innovation in vehicle automation, offering potential improvements in safety and efficiency. Despite these advances, incidents of fatalities involving autonomous vehicles have not ceased [2, 3]. This highlights the necessity for advanced decision-making models that can accurately navigate the complexities of the driving environment.
Consequently, we develop a reinforcement learning (RL) framework, which is capable of evaluating the current situation and formulating an optimal policy. Given the environment's complexity, we aim to expand the current understanding of deep reinforcement learning for autonomous driving by further exploring three algorithms: DQN, A3C and PPO. We utilise the HighwayEnv [4] to model the driving environment, which simplifies driving conditions effectively while retaining the essential complexity found in most driving scenarios. To evaluate the behavior of these agents, we simulate a range of tasks including reactions to a vehicle merging onto a main road, highway navigation, and maneuvering through a roundabout. For each agent, we implement an encoder-decoder neural network architecture using self-attention and compare it with a baseline Multi-Layer-Perceptron (MLP). We investigate whether using attention to determine the level of danger of each vehicle on the road allows each deep reinforcement learning agent to learn a more efficient and safer policy.
To evaluate the performance of our algorithms, we employ four metrics: total accumulated rewards, total steps taken, average speed, and distribution of actions. Our findings suggest that the on-policy deep RL methods outperform the off-policy one. Additionally, we identify several limitations within the environment that restrict the capabilities of our models.
2. Related work¶
Deep reinforcement learning has been extensively applied in autonomous driving to manage complex and dynamic driving scenarios. Approaches range from value-based methods like DQN and its variants—Double DQN and Dueling DQN, typically used in simpler navigational tasks [8, 9, 10] to policy-based methods like Soft Actor-Critic, A3C, and PPO, which are especially effective in managing continuous control and complex interactions [9, 11, 12]. While these methods have achieved significant advancements in simulating real-world environments, they often face challenges in accurately representing continuous state spaces, which is crucial for making precise navigational decisions in such dynamic settings.
Recent advancements propose the integration of attention mechanisms to improve the processing of environmental inputs in autonomous driving. Luarent et al. discuss an attention-based architecture that addresses the variability and complexity of traffic scenarios by focusing on dynamic interactions among vehicles, significantly enhancing performance and interpretability [5, 9]. This approach demonstrates substantial gains in complex negotiation tasks by providing a robust method for capturing and visualizing interactive patterns in dense traffic.
To that we can add that with the recent development of Deep Reinforcement Learning that combines both the perception of deep learning and the decision capability of reinforcement learning, has open the way to models that are more capable when it comes to generalise different driving environment. Wang et al. proposed a new state representation combining bird's-eye-view image with vector input where an attention mechanisms is implemented and a DRL algorithm to enhance the performance of the decision process.They showed using a D3QN method that combining vector and images led to a better state representation for decision-making. And combined with the attention mechanism further improved the results [14].
Despite progress, there has been little research on integrating attention mechanisms with deep RL models for autonomous driving. Current methods often fail to combine efficient state representation with policy optimization effectively. Our project attempts to further bridge this gap by enhancing DQN, A3C, and PPO algorithms with an attention-based architecture, aiming to improve decision accuracy and performance in navigating complex driving environments.
3. The Environment¶
This section provides a detailed description of the HighwayEnv [4] used for the comprehensive training process. It includes a variety of driving environments such as "merge" (in both its default and randomized forms), "highway," and "roundabout," all of which share similar features in terms of state representation, permissible actions, and reward systems. It is important to note that the environment assumes dirving on the right-hand side of the road.
- State representation: For this project, we adopt the "Kinematics" setting to depict each observation/state in a continuous format. Each state is presented as an array consisting of five sub-arrays, each representing a vehicle. The first sub-array always represents the agent (the ego-vehicle). Every vehicle array includes a set of five features that detail its position in the environment and its velocity. These features are:
- presence - indicates the detection of a vehicle (0 if not detected, 1 if detected)
- x - horizontal position along the x-axis
- y - vertical position along the y-axis
- vx - horizontal velocity along the x-axis
- vy - vertical velocity along the y-axis
All features are normalized and adjusted relative to the ego-vehicle's features to optimize state representation.
- Actions: The "DiscreteMetaActions" configuration is employed, offering the following simplified actions for the ego-vehicle:
- 0 - switch to the left lane
- 1 - keep same lane, same speed
- 2 - switch to the right lane
- 3 - decelerate
- 4 - accelerate
Actions for other vehicles involved in the simulations adhere to a simplified model, which prevents intentional collisions with other vehicles.
- Rewards:
The rewards formula vary across different environments but generally encompass the following components:
- collision - penalty, applied for collisions
- high_speed - reward, given for maintaining high speeds
- right_lane - reward for staying in the right lane
- merging_speed - altruistic penalty, issued for low speed on the merging lane
- on_road - reward for being on the road (0 if outside of the road, 1 if on the road)
The specific values for the rewards and penalties for each environment vary, but the underlying formulas generally remain consistent. An average of all rewards and penalties is calculated, and then this value is normalized to fall between 0 and 1. These values are selected to encourage safe and efficient driving practices, including adherence to general driving rules such as avoiding unnecessary use of the left lanes. More detailed explanations will be provided in subsequent sections.
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
from copy import deepcopy
import concurrent
import cv2
import gymnasium as gym
import ipywidgets as widgets
from IPython.display import display, HTML
from keras.layers import Dense, Input, Layer, Flatten, Concatenate
from keras.models import Model
from keras.optimizers import Adam
import numpy as np
import pandas as pd
import plotly
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import scipy.signal
import tensorflow as tf
plotly.offline.init_notebook_mode()
3.1 Merge¶
In the "merge" environment, the ego-vehicle must adopt a policy that facilitates the incorporation of a vehicle from a merging lane onto the main road. The corresponding reward function is defined as:
$$ R(s, a) = \frac{collision + high\_speed + merging\_speed + right\_lane - min}{max - min} $$
where: $collision = -1, high\_speed = 0.2, merging\_speed = -0.5, right\_lane = 0.1$, $min$ and $max$ are the minimum and maximum possible sums of these components, ensuring that $ R(s, a)$ is normalized between $0$ and $1$.
It is important to note that this environment, in its default settings, represents an overly-simplified scenario, as it only involves fixed initial positions for the other vehicles and no change in speeds. Thus, we modify the default "merge" environment to initialise random positions for the other vehicles, thereby increasing the complexity of the task. The rest of the principles in the modified environment remain consistent with those of the default environment. The code for the modified "merge" environment is presented in Appendix B.
# set render_mode to "human" to render the simulations
merge_env_config = {
"observation":{
"type":"Kinematics"
},
"action": {
"type": "DiscreteMetaAction"
},
"policy_frequency":8,
"simulation_frequency":16,
'collision_reward': -1,
"normalize_reward": True,
"random_spawn":True
}
merge_env = gym.make("merge-v0", render_mode="rgb_array")
merge_env.configure(merge_env_config)
3.2 Highway¶
In the "highway" environment, the ego-vehicle is positioned on a four-lane, one-way road alongside 50 other vehicles. It must navigate through this setting for a predetermined number of timesteps, adhering to a policy that aims to prevent collisions. The reward function is defined as follows:
$$ R(s, a) = \frac{collision + high\_speed + right\_lane - min}{max - min} × on\_road $$
where: $collision = -1, high\_speed = 0.4, right\_lane = 0.1$, $min$ and $max$ are the minimum and maximum possible sums of these components, ensuring that $ R(s, a)$ is normalized between $0$ and $1$.
# set render_mode to "human" to render the simulations
highway_env_config = {
"observation":{
"type":"Kinematics"
},
"action": {
"type": "DiscreteMetaAction"
},
"policy_frequency":8,
"simulation_frequency":16,
'collision_reward': -1,
"normalize_reward": True,
"random_spawn":True
}
highway_env = gym.make("highway-v0", render_mode="rgb_array")
highway_env.configure(highway_env_config)
3.3 Roundabout¶
The "roundabout" environment features a roundabout with four exits. The ego-vehicle starts at one exit and aims to safely navigate to and exit from the opposite side. During this maneuver, it must avoid collisions with other vehicles circulating within the roundabout. The reward function for this environment is outlined as follows:
$$ R(s, a) = \frac{collision + high\_speed - min}{max - min} × on\_road $$
where: $collision = -1, high\_speed = 0.2$, $min$ and $max$ are the minimum and maximum possible sums of these components, ensuring that $ R(s, a)$ is normalized between $0$ and $1$.
# set render_mode to "human" to render the simulations
roundabout_env_config = {
"observation":{
"type":"Kinematics"
},
"action": {
"type": "DiscreteMetaAction"
},
"policy_frequency":8,
"simulation_frequency":16,
'collision_reward': -1,
"normalize_reward": True,
"random_spawn":True
}
roundabout_env = gym.make("roundabout-v0", render_mode="rgb_array")
roundabout_env.configure(roundabout_env_config)
4. Methodology¶
Reinforcement learning provides a robust framework for sequential decision-making in environments characterized by uncertainty, making it highly suitable for our investigation into autonomous driving. In this project, we implement and analyze three deep reinforcement learning agents, each designed to navigate and make decisions in complex driving scenarios.
4.1 Deep Q-Network¶
The Deep Q-Network (DQN) algorithm is a model-free, online, off-policy reinforcement learning method that integrates the robust capabilities of Q-learning with the pattern recognition strength of deep neural networks. A DQN agent leverages a neural network to approximate the state-action value function, ( Q ), commonly known as Q-function. This allows it to perform Q-learning with function approximation, enabling the handling of environments with high-dimensional state spaces.
The key components of the algorithm are:
Neural Network Architecture: The state is input into the network, which then outputs Q-values for each possible action, aiming to approximate the optimal Q-function that satisfies the Bellman equation: $$ q_*(s,a) = \mathbb{E} \left[ R_{t+1} + \gamma \max_{a'} q_*(S', a') \right] $$ Here, the goal is to minimize the loss between the predicted Q-values and the Q-values obtained from the Bellman equation, thereby refining the policy.
Experience Replay: To enhance learning efficiency and stabilize updates, DQN utilizes experience replay. This technique involves storing the agent’s experiences at each time step in a replay memory, denoted as: $$ E_t = (s_t, a_t, r_{t+1}, s_{t+1}) $$ These experiences are sampled randomly to train the network, which helps to break the correlation between consecutive learning samples and reduces the variance of updates.
Exploration-Exploitation Strategy: DQN employs an $\epsilon$-greedy policy, where $\epsilon$ is gradually decreased. This strategy balances the need to explore the environment to find new strategies and the need to exploit the best-known strategies.
Originally developed by Mnih et al., DQN marked a significant advancement in reinforcement learning by demonstrating that deep learning could effectively be combined with Q-learning. Their work showed that a single reinforcement learning agent could achieve high-level performance across 49 different Atari games without the need for problem-specific feature engineering. This was pivotal in illustrating that deep learning could enable reinforcement learning agents to learn problem-specific features autonomously, thereby simplifying the implementation and reducing the reliance on domain-specific knowledge [6].
By integrating these techniques, DQN effectively addresses the challenges of learning in complex and high-dimensional environments, establishing a foundation for further innovations in reinforcement learning.
4.2 Asynchronous Advantage Actor-Critic¶
The Asynchronous Advantage Actor-Critic (A3C) algorithm belongs to the family of policy gradient methods and is distinguished by its unique architecture. A3C operates with a policy $\pi(a_t | s_t; \theta)$ and simultaneously estimates the value function $V(s_t; \theta_v)$. This algorithm functions on a forward-view basis, updating the policy and value function using a mixture of $n$-step returns that are updated every $t_{max}$ steps or upon reaching an end state.
A3C utilizes multiple agents that run asynchronously in parallel environments, each collecting states, actions, and rewards. These are periodically synchronized with global parameters. This asynchronous operation, a key differentiator from its synchronous counterpart, A2C, allows for more efficient exploration of the state-action space as independent agents can explore different strategies without waiting for batch updates.
The update process in A3C involves two steps:
First, an estimate of the advantage function $A(s_t, a_t; \theta, \theta_v)$ is calculated as: $$ \sum_{i=0}^{k-1} \gamma^i r_{t+i} + \gamma^k V(s_{t+k}; \theta_v) - V(s_t; \theta_v) $$ This function estimates the relative benefit of taking action $a$ in state $s$ compared to the average action value for that state.
Then, the policy is updated using the gradient: $$ \nabla_{\theta'} \log \pi(a_t | s_t; \theta') A(s_t, a_t; \theta, \theta_v) $$ The gradients are accumulated over multiple training episodes to enhance stability.
First introduced by DeepMind in 2016, A3C demonstrated superior performance in terms of training efficiency and computational demands. In their seminal paper, the researchers compared three variations of A3C against four DQN-based methods and the Gorilla method. Across 57 Atari games, A3C's variations consistently outperformed the competing methods, underscoring its effectiveness. The use of parallel actor-learners to update global parameters was found to significantly stabilize the learning process, facilitating faster and more efficient training [13].
4.3 Proximal Policy Optimization¶
The Proximal Policy Optimization (PPO) algorithm is a policy gradient method that utilizes multiple epochs of stochastic gradient ascent for each policy update. As an on-policy algorithm, PPO is versatile and suitable for both discrete and continuous action spaces. It is designed to enhance stability and sample efficiency over its predecessors, such as the Trust Region Policy Optimization (TRPO). Unlike TRPO, which relies on complex second-order methods, PPO simplifies implementation by using first-order methods, yet achieves comparable empirical performance.
Introduced by Schulman et al., PPO comes in two primary variants: PPO-Penalty and PPO-Clip. In our project, we employ PPO-Clip, as it typically shows superior performance. This variant mitigates the risk of significant deviations from the previous policy by employing a clipped surrogate objective function, which effectively manages the trade-off between exploiting the new policy's advantage and adhering closely to the previous policy. This approach prevents large policy updates that could lead to performance degradation and instability.
The policy update mechanism in PPO-Clip is defined as follows:
$$ \theta_{k+1} = \underset{\theta}{\text{argmax}}\underset{s,a, \pi_{\theta_k}} E[L(s,a,\theta_k, \theta)] $$
where the objective function ( L ) is calculated by:
$$ L(s,a,\theta_k,\theta) = \min\left( \frac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)} A^{\pi_{\theta_k}}(s,a), \; \text{clip}\left(\frac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)}, 1 - \epsilon, 1+\epsilon \right) A^{\pi_{\theta_k}}(s,a) \right) $$
Here, $ \epsilon $ delineates the allowable deviation between the new and old policies. The objective function consists of two parts: the conservative policy iteration $ L^{\text{CPI}} $ which aims to maximize the advantage of the current policy, and the clipping function $ L^{\text{CLIP}} $ which restricts large updates by penalizing changes that move the probability ratio $ r_t(\theta) $ away from 1:
$$ L^{\text{CLIP}}(\theta) = \hat{E}_t[\text{min}\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1 - \epsilon, 1 + \epsilon)\hat{A}_t\right)] $$
PPO trains a stochastic policy in an on-policy manner where actions are sampled by the latest policy iteration. The policy evolves to become less random as training progresses, reducing the risk of converging to local optima but increasing the need for careful exploration management.
PPO was developed by Schulman et al. to combine the stability and robustness of TRPO with a more straightforward implementation approach. The efficacy of PPO was demonstrated through comparative studies with A2C and ACER across various domains, including robotic locomotion and several Atari games, where it consistently outperformed other online policy gradient methods [7].
Note: We also investigated a model-based method, specifically Monte Carlo Tree Search (MCTS). However, due to its time inefficiency and unsatisfactory performance, we decided not to continue with a detailed analysis of this algorithm. A brief explanation of the method and selected results can be found in Appendix A.
5. Implementation¶
This section outlines the implementation details for the three different deep reinforcement learning agents used in the in the project. The two different neural network architectures used are explained. The use of a replay buffer is explored for each agent. Finally, modifications made in the implementation of each deep reinforcemnt learning agent are outlined.
5.1 Buffer¶
For each deep reinforcement learning algorithm we employ a replay buffer to store transitions over a number of episodes. The transitions in the buffer are then used to calculate losses and perfrom gradient descent to update model weights. For DQN and A3C, the buffer implementation is straightforward as it comprises of a series of arrays initialized at the beginning of each episode. In the case of A3C, each parallel agent has its own replay buffer. For PPO, the buffer is more complex and thus has been implemented in a class. Apart from storing and retreiving transitions, the PPO buffer additionally finishes trajectories using the advantage function.
class Buffer():
# Buffer for storing trajectories
def __init__(self, observation_dimensions, size, gamma=0.99, lam=0.95):
# Buffer initialization
self.observation_buffer = np.zeros(
(size, observation_dimensions), dtype=np.float32
)
self.action_buffer = np.zeros(size, dtype=np.int32)
self.advantage_buffer = np.zeros(size, dtype=np.float32)
self.reward_buffer = np.zeros(size, dtype=np.float32)
self.return_buffer = np.zeros(size, dtype=np.float32)
self.value_buffer = np.zeros(size, dtype=np.float32)
self.logprobability_buffer = np.zeros(size, dtype=np.float32)
self.gamma, self.lam = gamma, lam
self.pointer, self.trajectory_start_index = 0, 0
def store(self, observation, action, reward, value, logprobability):
# Append one step of agent-environment interaction
self.observation_buffer[self.pointer] = observation
self.action_buffer[self.pointer] = action
self.reward_buffer[self.pointer] = reward
self.value_buffer[self.pointer] = value
self.logprobability_buffer[self.pointer] = logprobability
self.pointer += 1
def discounted_cumulative_sums(self, x, discount):
# Discounted cumulative sums of vectors for computing rewards-to-go and advantage estimates
return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]
def finish_trajectory(self, last_value=0):
# Finish the trajectory by computing advantage estimates and rewards-to-go
path_slice = slice(self.trajectory_start_index, self.pointer)
rewards = np.append(self.reward_buffer[path_slice], last_value)
values = np.append(self.value_buffer[path_slice], last_value)
deltas = rewards[:-1] + self.gamma * values[1:] - values[:-1]
self.advantage_buffer[path_slice] = self.discounted_cumulative_sums(
deltas, self.gamma * self.lam
)
self.return_buffer[path_slice] = self.discounted_cumulative_sums(
rewards, self.gamma
)[:-1]
self.trajectory_start_index = self.pointer
def get(self):
# Get all data of the buffer and normalize the advantages
self.pointer, self.trajectory_start_index = 0, 0
advantage_mean, advantage_std = (
np.mean(self.advantage_buffer),
np.std(self.advantage_buffer),
)
self.advantage_buffer = (self.advantage_buffer - advantage_mean) / advantage_std
return_max, return_min = (
np.min(self.return_buffer),
np.max(self.return_buffer),
)
# min max scaling
self.return_buffer = 2*((self.return_buffer - return_min)/(return_max - return_min)) - 1
return (
self.observation_buffer,
self.action_buffer,
self.advantage_buffer,
self.return_buffer,
self.logprobability_buffer,
)
5.2 Architectures¶
In the context of autonomous driving, MLP-based deep reinforcement learning architectures have been shown to struggle to represent non-discrete observation spaces. As such, alternate approaches have been considered, including convolutional neural networks and transformers [5]. Attention, in particular has been shown to greatly increase the performance of the DQN algoirthm by allowing the q value network to identify vehicles which pose the greatest threat to the ego vehicle. The attention network architecture implemented in this project closely followed that proposed by Leurent [5] which comprises of:
- An input layer which receives an array of $N$ vehicle feature arrays where the first array corresponds to the ego vehicle.
- An encoding layer which applies a linear encoding to transform each feature array $n$ into an array of keys $k_n$ and values $v_n$. Linear encoding is performed using a fully conected layer with an output (encoding) dimension $d_k$. For the ego vehicle, an additional array of queries $q_0$ is created. The output of the encoding layer are three arrays: $Q = (q_0)$, $K = (k_0 \ldots k_n)$, $V = (v_0 \ldots v_n)$.
- An Ego attention layer which has been implemented in the EgoAttentionLayer class and outputs $softmax(\frac{QK^T}{\sqrt{d_k}}) V$. This operation can be split to obtain multihead attention. In our implementation we use 2 attention heads.
- A fully connected output layer mapped to the action space for the actor and an output size of 1 for the critic. In the case of DQN, there is only one output layer mapped to the action space.
To evaluate the performance of the attention models, a standard Multi-Layer-Perceptron (MLP) architecture was additionally implemented. The MLP architecture is comprised of the following layers:
- A flattening layer which converts the observation input to a one dimensional array.
- Two fully connected layers with an output size of 64 with tanh activation. These layers are shared by both the actor and critics for the PPO and A3C agents and are connected to the action output and value outputs respectively. The action output layer is a fully connected layer with an output mapped to the action space linear activation function. In the case of PPO and A3C, logsoftmax is layer applied to the output of the network to smaple the optimal action. For DQN no additional activation is applied. For PPO and A3C, the critic value is calcualted as the output of a fully connected layer with an output size of 1 and a linear activation.
[5]
class EgoAttentionLayer(Layer):
def __init__(self, num_heads, encoding_dim, **kwargs):
super(EgoAttentionLayer, self).__init__(**kwargs)
self.num_heads = num_heads
self.encoding_dim = encoding_dim
assert self.encoding_dim % self.num_heads == 0
self.depth = self.encoding_dim // self.num_heads
self.wq = Dense(self.encoding_dim)
self.wk = Dense(self.encoding_dim)
self.wv = Dense(self.encoding_dim)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, q, k, v, mask=None):
batch_size = tf.shape(q)[0]
q = self.split_heads(self.wq(q), batch_size)
k = self.split_heads(self.wk(k), batch_size)
v = self.split_heads(self.wv(v), batch_size)
scaled_attention_logits = tf.matmul(q, k, transpose_b=True) / tf.math.sqrt(tf.cast(self.depth, tf.float32))
if mask is not None:
scaled_attention_logits += (mask * -1e9)
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
output = tf.matmul(attention_weights, v)
return tf.reshape(output, (batch_size, -1, self.encoding_dim))
5.3 DQN¶
Several modifications have been made when implementing the DQN agent. Firstly, the value function has been changed to Adam from RMSProp in order maintain consistency with A3C and PPO agents. For each episode, the DQN agent performs N consecutive runs determined by the num_actors parameter.The motivation behind this change is to allow for a better comparison with A3C which performs N parallel runs in each episode. The transitions in each of the N runs are stored in a replay buffer.
Another major change was the exploration strategy used. Instead of $\epsilon$-greedy action selection, this implementation utilizes Belmann sampling akin to PPO and A3C. At each timestep $t$, the action $a_t \sim log(softmax(Q(s_t,a_t;\theta)))$. We found this strategy to converge to a more optimal policy when compared to $\epsilon$-greedy.
class DQNAgent():
def __init__(self,
input_shape,
num_actions,
gamma=0.99,
learning_rate=3e-4,
encoding_dim=64,
num_heads=2,
use_attention=False):
self.input_shape = input_shape
self.num_actions = num_actions
self.gamma = gamma
self.encoding_dim = encoding_dim
self.num_heads = num_heads
self.use_attention = use_attention
self.optimizer = Adam(learning_rate=learning_rate)
self.loss_function = tf.keras.losses.Huber()
if self.use_attention:
self.value_function = self.build_ego_attention_network()
self.target_value_function = self.build_ego_attention_network()
else:
self.value_function = self.build_mlp()
self.target_value_function = self.build_mlp()
def build_mlp(self):
input_layer = Input(shape=self.input_shape)
x = Dense(64, activation="tanh")(input_layer)
x = Dense(64, activation="tanh")(x)
value_output = Dense(self.num_actions, activation="linear")(x)
return Model(inputs=input_layer, outputs=value_output)
def build_ego_attention_network(self):
input_layer = Input(shape=self.input_shape, name='input_layer')
encoded_features = []
for i in range(self.input_shape[0]):
feature_vector = input_layer[:, i, :]
if i == 0:
# Ego encoder: Output L_q, L_k, L_v
L_q = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_q')(feature_vector)
L_k = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_k')(feature_vector)
L_v = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_v')(feature_vector)
ego_encoded = [L_q, L_k, L_v]
else:
# Regular encoder: Output L_k and L_v
L_k = Dense(self.encoding_dim, activation='linear', name=f'encoding_L_k_{i}')(feature_vector)
L_v = Dense(self.encoding_dim, activation='linear', name=f'encoding_L_v_{i}')(feature_vector)
encoded_features.extend([L_k, L_v])
# Ego attention layer
ego_attention = EgoAttentionLayer(num_heads=self.num_heads, encoding_dim=self.encoding_dim, name='ego_attention_layer')
ego_attention_output = ego_attention(ego_encoded[0], ego_encoded[1], ego_encoded[2])
# Decoder layer
decoder_output = Dense(self.num_actions, activation='linear', name='value_estimate')(ego_attention_output)
return Model(inputs=input_layer, outputs=decoder_output)
def sample_action(self, observation):
if self.use_attention:
logits = tf.squeeze(self.value_function(observation), [0])
else:
logits = self.value_function(observation)
action = tf.squeeze(
tf.keras.random.categorical(logits, 1), axis=1
)
return logits, action
def evaluate(self, env, num_actors, max_steps):
returns, lengths, speeds = [], [], []
action_probs = np.zeros(self.num_actions)
for a in range(num_actors):
observation, _ = env.reset()
episode_return, episode_length = 0, 0
episode_speeds = []
for t in range(max_steps):
env.render()
if self.use_attention:
reshaped_observation = tf.expand_dims(observation, axis=0)
else:
reshaped_observation = tf.expand_dims(observation.flatten(), axis=0)
logits = self.value_function(reshaped_observation)
logprobs = tf.keras.ops.softmax(logits)
action = np.argmax(logprobs)
action_probs += tf.one_hot(action, self.num_actions).numpy()
observation, reward, done, _, info = env.step(action)
episode_length += 1
episode_return += reward
episode_speeds.append(info["speed"])
if done or (t == max_steps - 1):
returns.append(episode_return)
lengths.append(episode_length)
speeds.append(np.mean(episode_speeds))
break
env.close()
normalized_action_probs = action_probs / np.sum(action_probs)
return np.mean(returns), np.mean(lengths), np.mean(speeds), normalized_action_probs
def train(self, env, num_episodes, num_actors, max_steps, name, batch_size=32, eval_frequency=1):
returns_history, lengths_history, speeds_history = [], [], []
loss_history = []
action_probs_history = []
for episode in range(num_episodes):
action_buffer = []
observation_buffer = []
new_observation_buffer = []
done_buffer = []
reward_buffer = []
for _ in range(num_actors):
observation, _ = env.reset()
episode_reward = 0
for t in range(max_steps):
if self.use_attention:
reshaped_observation = tf.expand_dims(observation, axis=0)
else:
reshaped_observation = tf.expand_dims(observation.flatten(), axis=0)
logits, action = self.sample_action(reshaped_observation)
observation_new, reward, done, _, info = env.step(action[0].numpy())
if self.use_attention:
reshaped_observation_new = tf.expand_dims(observation_new, axis=0)
else:
reshaped_observation_new = tf.expand_dims(observation_new.flatten(), axis=0)
episode_reward += reward
# Save actions and states in replay buffer
action_buffer.append(action)
observation_buffer.append(reshaped_observation)
new_observation_buffer.append(reshaped_observation_new)
done_buffer.append(done)
reward_buffer.append(reward)
observation = observation_new
if done or t == (max_steps - 1):
break
new_observation_buffer = tf.cast(tf.squeeze(new_observation_buffer), dtype=tf.float32)
observation_buffer = tf.cast(tf.squeeze(observation_buffer), dtype=tf.float32)
action_buffer = tf.cast(tf.convert_to_tensor(action_buffer), dtype=tf.int32)
done_buffer = tf.cast(tf.convert_to_tensor(done_buffer), dtype=tf.float32)
reward_buffer = tf.cast(tf.convert_to_tensor(reward_buffer), dtype=tf.float32)
future_rewards = self.target_value_function(new_observation_buffer)
# Q value = reward + discount factor * expected future reward
if self.use_attention:
future_rewards = tf.squeeze(future_rewards)
updated_q_values = reward_buffer + self.gamma * tf.keras.ops.amax(
future_rewards, axis=1
)
# print(updated_q_values.shape)
# If final frame set the last value to -1
updated_q_values = updated_q_values * (1 - done_buffer) - done_buffer
# Create a mask so we only calculate loss on the updated Q-values
masks = tf.keras.ops.one_hot(action_buffer, self.num_actions)
with tf.GradientTape() as tape:
# Train the model on the states and updated Q-values
q_values = self.value_function(observation_buffer)
# Apply the masks to the Q-values to get the Q-value for action taken
q_action = tf.reduce_sum(tf.keras.ops.sum(tf.keras.ops.multiply(q_values, masks), axis=1), axis=1)
# Calculate loss between new Q-value and old Q-value
loss = self.loss_function(updated_q_values, q_action)
# Backpropagation
grads = tape.gradient(loss, self.value_function.trainable_variables)
self.optimizer.apply_gradients(zip(grads, self.value_function.trainable_variables))
self.target_value_function.set_weights(self.value_function.get_weights())
loss_history.append(loss)
print(f"Loss: {loss}")
if episode % eval_frequency == 0:
mean_returns, mean_lengths, mean_speeds, action_probs = self.evaluate(env, num_actors, max_steps)
print(f"Epoch: {episode + 1}. Mean Return: {mean_returns}. Mean Length: {mean_lengths}. Mean speed: {mean_speeds}.")
returns_history.append(mean_returns)
lengths_history.append(mean_lengths)
speeds_history.append(mean_speeds)
action_probs_history.append(action_probs)
df = pd.DataFrame(columns=["returns", "lengths", "speeds", "action_probs", "loss"])
df["returns"] = returns_history
df["lengths"] = lengths_history
df["speeds"] = speeds_history
df["action_probs"] = action_probs_history
df["loss"] = np.array(loss)
df.to_csv(f"{name}.csv")
self.value_function.save(f"{name}.keras")
return df
5.4 A3C¶
In the original A3C implementation, each parallel actor computes gradients which are used by the global actor to perform gradient descent. We found this method to be suboptimal as since after each gradient descent iteration parallel actor weights are synchronized with the global actor, this method can lead to rapid convergence to a suboptimal policy. Instead, we perform gradient descent individually for each parallel actor, obtainign a vector of weights $(\theta_1,\theta_2,\ldots,\theta_n)$. These weights are then used to update the global actors weights, $\theta_0$ by computing the weighted average of parallel actor weights: $$ \theta_0 \leftarrow \sum_{i=1}^{n}{\theta_i * \frac{1-l(\theta_i)}{\sum_{i=1}^{n}{1-l(\theta_i)}}}$$ There are two main benefits to this approach. Firstly by computing the weighted average, we reward parallel actors which performed better during their runs. Furthermore, as each parallel actor has its own weights $\theta_i$ which are not synchronised with the global actor, in theory, each actor should be able to explore a unique strategy.
class A3CAgent():
def __init__(self,
input_shape,
num_actions,
num_actors=8,
actor_learning_rate=3e-4,
critic_learning_rate=3e-4,
use_attention=False,
encoding_dim=64,
num_heads=2):
self.input_shape = input_shape
self.num_actions = num_actions
self.num_actors = num_actors
self.actor_optimizers = [Adam(learning_rate=actor_learning_rate) for _ in range(num_actors)]
self.critic_optimizers = [Adam(learning_rate=critic_learning_rate) for _ in range(num_actors)]
self.use_attention = use_attention
if self.use_attention is False:
input_layer, action_output, value_output = self.build_mlp()
self.global_actor = Model(inputs=input_layer, outputs=action_output)
self.actors = []
self.critics = []
for _ in range(self.num_actors):
input_layer, action_output, value_output = self.build_mlp()
self.actors.append(Model(inputs=input_layer, outputs=action_output))
self.critics.append(Model(inputs=input_layer, outputs=value_output))
else:
self.encoding_dim = encoding_dim
self.num_heads = num_heads
input_layer, encoded_features, _ = self.build_ego_attention_network()
self.global_actor = self.build_attention_actor(input_layer, encoded_features)
self.actors = []
self.critics = []
for _ in range(self.num_actors):
input_layer, encoded_features, decoder_output = self.build_ego_attention_network()
self.actors.append(self.build_attention_actor(input_layer, encoded_features))
self.critics.append(self.build_attention_critic(input_layer, decoder_output))
def build_mlp(self):
input_layer = Input(shape=self.input_shape)
x = Dense(64, activation="tanh")(input_layer)
x = Dense(64, activation="tanh")(x)
action_output = Dense(self.num_actions)(x)
value_output = Dense(1, activation="linear")(x)
return input_layer, action_output, value_output
def build_ego_attention_network(self):
input_layer = Input(shape=self.input_shape, name='input_layer')
encoded_features = []
for i in range(self.input_shape[0]):
feature_vector = input_layer[:, i, :]
if i == 0:
# Ego encoder: Output L_q, L_k, L_v
L_q = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_q')(feature_vector)
L_k = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_k')(feature_vector)
L_v = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_v')(feature_vector)
ego_encoded = [L_q, L_k, L_v]
else:
# Regular encoder: Output L_k and L_v
L_k = Dense(self.encoding_dim, activation='linear', name=f'encoding_L_k_{i}')(feature_vector)
L_v = Dense(self.encoding_dim, activation='linear', name=f'encoding_L_v_{i}')(feature_vector)
encoded_features.extend([L_k, L_v])
# Ego attention layer
ego_attention = EgoAttentionLayer(num_heads=self.num_heads, encoding_dim=self.encoding_dim, name='ego_attention_layer')
ego_attention_output = ego_attention(ego_encoded[0], ego_encoded[1], ego_encoded[2])
# Decoder layer
decoder_output = Dense(1, activation='linear', name='value_estimate')(ego_attention_output)
return input_layer, encoded_features, decoder_output
def build_attention_actor(self, input_layer, encoded_features):
x = Concatenate()(encoded_features)
x = Flatten()(x)
x = Dense(64, activation="tanh")(x)
x = Dense(64, activation="tanh")(x)
outputs = Dense(self.num_actions)(x)
return Model(inputs=input_layer, outputs=outputs)
def build_attention_critic(self, input_layer, decoder_output):
return Model(inputs=input_layer, outputs=decoder_output)
def compute_entropy(self, logits):
# Convert logits to logprobabilities
logprobabilities = tf.keras.ops.log_softmax(logits + 1e-10)
return -tf.reduce_sum(logits * logprobabilities, axis=1)
def sample_action(self, actor, observation):
logits = actor(observation)
# print(logits)
action = tf.squeeze(
tf.keras.random.categorical(logits, 1), axis=1
)
return logits, action
def logprobabilities(self, logits, action):
logprobabilities_all = tf.keras.ops.log_softmax(logits)
logprobability = tf.keras.ops.sum(
tf.keras.ops.one_hot(action, self.num_actions) * logprobabilities_all, axis=1
)
return logprobability
def train_critic(self, critic, critic_optimizer, observation_buffer, returns):
with tf.GradientTape() as tape:
loss = tf.keras.losses.Huber()(critic(observation_buffer), returns)
grads = tape.gradient(loss, critic.trainable_variables)
critic_optimizer.apply_gradients(zip(grads, critic.trainable_variables))
return loss
def evaluate_actor(self, env, num_actors, max_steps):
returns, lengths, speeds = [], [], []
action_probs = np.zeros(self.num_actions)
for a in range(num_actors):
observation, _ = env.reset()
episode_return, episode_length = 0, 0
episode_speeds = []
for t in range(max_steps):
# env.render()
if self.use_attention:
reshaped_observation = tf.expand_dims(observation, axis=0)
else:
reshaped_observation = tf.expand_dims(observation.flatten(), axis=0)
logits = self.global_actor(reshaped_observation)
logprobs = tf.keras.ops.softmax(logits)
action = np.argmax(logprobs)
observation, reward, done, _, info = env.step(action)
episode_length += 1
episode_return += reward
episode_speeds.append(info["speed"])
action_probs += tf.one_hot(action, self.num_actions).numpy()
if done or (t == max_steps - 1):
returns.append(episode_return)
lengths.append(episode_length)
speeds.append(np.mean(episode_speeds))
env.close()
normalized_action_probs = action_probs / np.sum(action_probs)
return np.mean(returns), np.mean(lengths), np.mean(speeds), normalized_action_probs
def run_actor(self, env, actor, critic, actor_optimizer, critic_optimizer, max_steps, gamma=0.99, entropy_coefficient=0.01):
with tf.GradientTape(persistent=True) as tape:
observation_buffer = []
# action_buffer = []
value_buffer = []
return_buffer = []
logprobability_buffer = []
entropy_buffer = []
observation, _ = env.reset()
for t in range(max_steps):
# env.render(); Adding this line would show the attempts
if self.use_attention:
reshaped_observation = tf.expand_dims(observation, axis=0)
else:
reshaped_observation = tf.expand_dims(observation.flatten(), axis=0)
logits, action = self.sample_action(actor, reshaped_observation)
action = action[0].numpy()
observation_new, reward, done, _, info = env.step(action)
value_t = tf.keras.ops.squeeze(critic(reshaped_observation)).numpy()
logprobability_t = self.logprobabilities(logits, action)
entropy_t = self.compute_entropy(logits)
value_buffer.append(value_t)
entropy_buffer.append(entropy_t)
logprobability_buffer.append(logprobability_t)
return_buffer.append(reward)
# action_buffer.append(action)
observation_buffer.append(reshaped_observation)
# update observation
observation = observation_new
if done:
break
returns = []
discounted_sum = 0
for r in return_buffer[::-1]:
discounted_sum = r + gamma * discounted_sum
returns.insert(0, discounted_sum)
# train actor and critic
observation_buffer = tf.cast(tf.squeeze(tf.convert_to_tensor(observation_buffer)), dtype=tf.float32)
logprobability_buffer = tf.cast(tf.convert_to_tensor(logprobability_buffer), dtype=tf.float32)
returns = tf.cast(tf.convert_to_tensor(returns), dtype=tf.float32)
entropy_buffer = tf.cast(tf.convert_to_tensor(entropy_buffer), dtype=tf.float32)
value_buffer = tf.cast(tf.convert_to_tensor(value_buffer), dtype=tf.float32)
returns = 2*(returns - np.min(returns))/ (np.max(returns) - np.min(returns)) - 1
actor_loss = tf.reduce_mean(
tf.squeeze(-logprobability_buffer * (returns - value_buffer)
- tf.scalar_mul(entropy_coefficient, entropy_buffer))
)
actor_grads = tape.gradient(actor_loss, actor.trainable_variables)
actor_optimizer.apply_gradients(zip(actor_grads, actor.trainable_variables))
critic_loss = self.train_critic(critic, critic_optimizer, observation_buffer, returns)
return actor.get_weights(), actor_loss, critic_loss
def update_global_actor(self, actor_weights_arr, actor_losses):
total_loss = np.sum(actor_losses)
# compute weights for the weighted average (not the same as model weights!)
weights = [(total_loss - loss) for loss in actor_losses]
total_weights = np.sum(weights)
normalized_weights = [(weight / total_weights) for weight in weights]
average_actor_weights = []
for layer in actor_weights_arr[0]:
average_actor_weights.append(np.zeros_like(layer))
# compute weighted average of the parallel actor weights
for actor, weight in zip(actor_weights_arr, normalized_weights):
for l, layer in enumerate(actor):
if layer.ndim == 1:
for p, param in enumerate(layer):
average_actor_weights[l][p] += param * weight
elif layer.ndim == 2:
for s, sub_layer in enumerate(layer):
for p, param in enumerate(sub_layer):
average_actor_weights[l][s][p] += param * weight
self.global_actor.set_weights(average_actor_weights)
print(f"Average Actor Loss: {np.mean(actor_losses)}")
def train(self, envs, num_episodes, num_actors, name, max_steps=128, eval_frequency=1, entropy_coefficient=0.01):
returns_history, lengths_history, speeds_history = [], [], []
actor_losses_history, critic_losses_history = [], []
action_probs_history = []
for episode in range(num_episodes):
actor_weights_arr = []
actor_losses = []
critic_losses = []
with concurrent.futures.ThreadPoolExecutor(max_workers=num_actors) as executor:
futures = []
for env, actor, critic, actor_optimizer, critic_optimizer in zip(envs, self.actors, self.critics, self.actor_optimizers, self.critic_optimizers):
futures.append(executor.submit(self.run_actor, env, actor, critic, actor_optimizer, critic_optimizer, max_steps))
# Collect results
for future in concurrent.futures.as_completed(futures):
actor_weights, actor_loss, critic_loss = future.result()
actor_weights_arr.append(actor_weights)
actor_losses.append(actor_loss)
critic_losses.append(critic_loss)
# update global actor
self.update_global_actor(actor_weights_arr, actor_losses)
actor_losses_history.append(np.mean(actor_losses))
critic_losses_history.append(np.mean(critic_losses))
if episode % eval_frequency == 0:
mean_returns, mean_lengths, mean_speeds, action_probs = self.evaluate_actor(envs[0], num_actors, max_steps)
print(f"Epoch: {episode + 1}. Mean Return: {mean_returns}. Mean Length: {mean_lengths}. Mean speed: {mean_speeds}.")
returns_history.append(mean_returns)
lengths_history.append(mean_lengths)
speeds_history.append(mean_speeds)
action_probs_history.append(action_probs)
df = pd.DataFrame(columns=["returns", "lengths", "speeds", "action_probs", "actor_loss", "critic_loss"])
df["returns"] = returns_history
df["lengths"] = lengths_history
df["speeds"] = speeds_history
df["action_probs"] = action_probs_history
df["actor_loss"] = np.array(actor_losses_history)
df["critic_loss"] = np.array(critic_losses_history)
df.to_csv(f"{name}.csv")
self.global_actor.save(f"{name}.keras")
return df
5.5 PPO¶
class PPOAgent():
def __init__(self,
input_shape,
buffer_size,
num_actions,
encoding_dim=64,
num_heads=2,
use_attention=False,
actor_learning_rate=3e-4,
critic_learning_rate=3e-4):
self.input_shape = input_shape
self.num_actions = num_actions
self.encoding_dim = encoding_dim
self.num_heads = num_heads
self.actor_optimizer = tf.keras.optimizers.Adam(learning_rate=actor_learning_rate)
self.critic_optimizer = tf.keras.optimizers.Adam(learning_rate=critic_learning_rate)
self.use_attention = use_attention
if self.use_attention is False:
self.buffer = Buffer(input_shape[0], buffer_size)
input_layer, action_output, value_output = self.build_mlp()
self.actor = Model(inputs=input_layer, outputs=action_output)
self.critic = Model(inputs=input_layer, outputs=value_output)
else:
self.buffer = Buffer(input_shape[0]*input_shape[1], buffer_size)
self.encoding_dim = encoding_dim
self.num_heads = num_heads
input_layer, encoded_features, decoder_output = self.build_ego_attention_network()
self.actor = self.build_attention_actor(input_layer, encoded_features)
self.critic = self.build_attention_critic(input_layer, decoder_output)
def build_mlp(self):
input_layer = Input(shape=self.input_shape)
x = Dense(64, activation="tanh")(input_layer)
x = Dense(64, activation="tanh")(x)
action_output = Dense(self.num_actions)(x)
value_output = Dense(1, activation="linear")(x)
return input_layer, action_output, value_output
def build_ego_attention_network(self):
input_layer = Input(shape=self.input_shape, name='input_layer')
encoded_features = []
for i in range(self.input_shape[0]):
feature_vector = input_layer[:, i, :]
if i == 0:
# Ego encoder: Output L_q, L_k, L_v
L_q = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_q')(feature_vector)
L_k = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_k')(feature_vector)
L_v = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_v')(feature_vector)
ego_encoded = [L_q, L_k, L_v]
else:
# Regular encoder: Output L_k and L_v
L_k = Dense(self.encoding_dim, activation='linear', name=f'encoding_L_k_{i}')(feature_vector)
L_v = Dense(self.encoding_dim, activation='linear', name=f'encoding_L_v_{i}')(feature_vector)
encoded_features.extend([L_k, L_v])
# Ego attention layer
ego_attention = EgoAttentionLayer(num_heads=self.num_heads, encoding_dim=self.encoding_dim, name='ego_attention_layer')
ego_attention_output = ego_attention(ego_encoded[0], ego_encoded[1], ego_encoded[2])
# Decoder layer
decoder_output = Dense(1, activation='linear', name='value_estimate')(ego_attention_output)
return input_layer, encoded_features, decoder_output
def build_attention_actor(self, input_layer, encoded_features):
x = Concatenate()(encoded_features)
x = Flatten()(x)
x = Dense(64, activation="tanh")(x)
x = Dense(64, activation="tanh")(x)
outputs = Dense(self.num_actions)(x)
return Model(inputs=input_layer, outputs=outputs)
def build_attention_critic(self, input_layer, decoder_output):
return Model(inputs=input_layer, outputs=decoder_output)
def train(self, env, num_epochs, num_actors, max_steps, train_policy_iterations=100, eval_frequency=5):
returns_arr, lengths_arr = [], []
for epoch in range(num_epochs):
returns, lengths, speeds = [], [], []
for a in range(num_actors):
observation, _ = env.reset()
for t in range(max_steps):
# Get the logits, action, and take one step in the environment
# todo: actually use this
if self.use_attention:
reshaped_observation = tf.expand_dims(observation, axis=0)
else:
reshaped_observation = tf.expand_dims(observation.flatten(), axis=0)
logits, action = self.sample_action(reshaped_observation)
observation_new, reward, done, _, info = env.step(action[0].numpy())
# reward = reward_function(observation_new, info)
# Get the value and log-probability of the action
value_t = tf.keras.ops.squeeze(self.critic(reshaped_observation)).numpy()
logprobability_t = self.logprobabilities(logits, action)
# Store obs, act, rew, v_t, logp_pi_t
self.buffer.store(observation.reshape(1,-1), action, reward, value_t, logprobability_t)
# Update the observation
observation = observation_new
# Finish trajectory if reached to a terminal state
if done or (t == max_steps - 1):
last_value = 0 if done else tf.keras.ops.squeeze(self.critic(reshaped_observation)).numpy()
self.buffer.finish_trajectory(last_value)
break
# Get values from the buffer
(
observation_buffer,
action_buffer,
advantage_buffer,
return_buffer,
logprobability_buffer,
) = self.buffer.get()
# Update the policy and implement early stopping using KL divergence
for i in range(train_policy_iterations):
kl, actor_loss = self.train_actor(
observation_buffer, action_buffer, logprobability_buffer, advantage_buffer
)
critic_loss = self.train_critic(observation_buffer, return_buffer)
# if kl > 1.5 * target_kl:
# # Early Stopping
# break
print(f"Actor loss: {actor_loss}. Critic loss: {critic_loss}")
if epoch % eval_frequency == 0:
mean_returns, mean_lengths, mean_speeds = self.evaluate_actor(env, num_actors, max_steps)
print(f"Epoch: {epoch + 1}. Mean Return: {mean_returns}. Mean Length: {mean_lengths}. Mean speed: {mean_speeds}.")
def sample_action(self, observation):
logits = self.actor(observation)
action = tf.squeeze(
tf.keras.random.categorical(logits, 1), axis=1
)
return logits, action
def logprobabilities(self, logits, action):
logprobabilities_all = tf.keras.ops.log_softmax(logits)
logprobability = tf.keras.ops.sum(
tf.keras.ops.one_hot(action, self.num_actions) * logprobabilities_all, axis=1
)
return logprobability
def train_actor(self,
observation_buffer,
action_buffer,
logprobability_buffer,
advantage_buffer,
clip_ratio=0.2):
if self.use_attention:
observation_buffer = observation_buffer.reshape(-1, 5, 5)
else:
observation_buffer = observation_buffer.reshape(-1,25)
with tf.GradientTape() as tape: # Record operations for automatic differentiation.
ratio = tf.keras.ops.exp(
self.logprobabilities(self.actor(observation_buffer), action_buffer)
- logprobability_buffer
)
min_advantage = tf.keras.ops.where(
advantage_buffer > 0,
(1 + clip_ratio) * advantage_buffer,
(1 - clip_ratio) * advantage_buffer,
)
loss = -tf.keras.ops.mean(
tf.keras.ops.minimum(ratio * advantage_buffer, min_advantage)
)
grads = tape.gradient(loss, self.actor.trainable_variables)
self.actor_optimizer.apply_gradients(zip(grads, self.actor.trainable_variables))
kl = tf.keras.ops.mean(
logprobability_buffer
- self.logprobabilities(self.actor(observation_buffer), action_buffer)
)
kl = tf.keras.ops.sum(kl)
return kl, loss
def train_critic(self, observation_buffer, return_buffer):
if self.use_attention:
observation_buffer = observation_buffer.reshape(-1, 5, 5)
else:
observation_buffer = observation_buffer.reshape(-1,25)
with tf.GradientTape() as tape: # Record operations for automatic differentiation.
loss = tf.keras.ops.mean((return_buffer - self.critic(observation_buffer)) ** 2)
grads = tape.gradient(loss, self.critic.trainable_variables)
self.critic_optimizer.apply_gradients(zip(grads, self.critic.trainable_variables))
return loss
def evaluate_actor(self, env, num_actors, max_steps):
returns, lengths, speeds = [], [], []
for a in range(num_actors):
observation, _ = env.reset()
episode_return, episode_length = 0, 0
episode_speeds = []
for t in range(max_steps):
env.render()
if self.use_attention:
reshaped_observation = tf.expand_dims(observation, axis=0)
else:
reshaped_observation = tf.expand_dims(observation.flatten(), axis=0)
logits = self.actor(reshaped_observation)
logprobs = tf.keras.ops.softmax(logits)
action = np.argmax(logprobs)
observation, reward, done, _, info = env.step(action)
episode_length += 1
episode_return += reward
episode_speeds.append(info["speed"])
if done or (t == max_steps - 1):
returns.append(episode_return)
lengths.append(episode_length)
speeds.append(np.mean(episode_speeds))
break
env.close()
return np.mean(returns), np.mean(lengths), np.mean(speeds)
6. Training and Results¶
6.1 Training¶
Due to computational constraints, results were computed by training each agent for 200 episodes using 8 actors. Each agent was trained with and without attention on each of the three environments. Throughout training, a number of metrics was tracked including: mean episode duration, mean episode return, mean vehicle speed, and the probability of each possible action taken. After each training run, results were saved to a csv file(see results folder), and model weights were saved to a .keras file (see models folder).
N_EPISODES=200
N_ACTORS=8
MAX_STEPS_MERGE=128
MAX_STEPS_ROUNDABOUT=64
MAX_STEPS_HIGHWAY=64
# Training Merge
parallel_merge_envs = [gym.make("merge-v0") for _ in range(8)]
for env in parallel_merge_envs:
env.configure(merge_env_config)
num_actions = merge_env.action_space.n
input_shape = (merge_env.observation_space.shape[0] * merge_env.observation_space.shape[1],)
input_shape_attention = merge_env.observation_space.shape
dqn_agent = DQNAgent(input_shape=input_shape,
num_actions=num_actions)
dqn_attention_agent = DQNAgent(input_shape=input_shape_attention,
num_actions=num_actions,
use_attention=True)
a3c_agent = A3CAgent(
input_shape=input_shape,
num_actions=num_actions,
)
a3c_attention_agent = A3CAgent(
input_shape=input_shape_attention,
num_actions=num_actions,
use_attention=True
)
ppo_agent = PPOAgent(input_shape=input_shape,
buffer_size=MAX_STEPS_MERGE * N_ACTORS,
num_actions=num_actions)
ppo_attention_agent = PPOAgent(input_shape=input_shape_attention,
buffer_size=MAX_STEPS_MERGE * N_ACTORS,
num_actions=num_actions,
use_attention=True)
"""
dqn_agent.train(env=merge_env,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_MERGE,
name="dqn_merge")
dqn_attention_agent.train(env=merge_env,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_MERGE,
name="dqn_attention_merge")
a3c_agent.train(
envs=parallel_merge_envs,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_MERGE,
name="a3c_merge"
)
a3c_attention_agent.train(
envs=parallel_merge_envs,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_MERGE,
name="a3c_attention_merge"
)
ppo_agent.train(env=env,
num_epochs=N_EPOCHS,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_MERGE,
train_policy_iterations=100,
name="ppo_merge")
ppo_attention_agent.train(env=env,
num_epochs=N_EPOCHS,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_MERGE,
train_policy_iterations=100,
name="ppo_attention_merge")
"""
dqn_merge_results = pd.read_csv(r"results\dqn_merge.csv")
dqn_attention_merge_results = pd.read_csv(r"results\dqn_attention_merge.csv")
a3c_merge_results = pd.read_csv(r"results\a3c_merge.csv")
a3c_attention_merge_results = pd.read_csv(r"results\a3c_attention_merge.csv")
ppo_merge_results = pd.read_csv(r"results\ppo_merge.csv")
ppo_attention_merge_results = pd.read_csv(r"results\ppo_attention_merge.csv")
# Training Roundabout
parallel_roundabout_envs = [gym.make("roundabout-v0") for _ in range(8)]
for env in parallel_roundabout_envs:
env.configure(roundabout_env_config)
num_actions = roundabout_env.action_space.n
input_shape = (roundabout_env.observation_space.shape[0] * roundabout_env.observation_space.shape[1],)
input_shape_attention = roundabout_env.observation_space.shape
dqn_agent = DQNAgent(input_shape=input_shape,
num_actions=num_actions)
dqn_attention_agent = DQNAgent(input_shape=input_shape_attention,
num_actions=num_actions,
use_attention=True)
a3c_agent = A3CAgent(
input_shape=input_shape,
num_actions=num_actions,
)
a3c_attention_agent = A3CAgent(
input_shape=input_shape_attention,
num_actions=num_actions,
use_attention=True
)
ppo_agent = PPOAgent(input_shape=input_shape,
buffer_size=MAX_STEPS_MERGE * N_ACTORS,
num_actions=num_actions)
ppo_attention_agent = PPOAgent(input_shape=input_shape_attention,
buffer_size=MAX_STEPS_MERGE * N_ACTORS,
num_actions=num_actions,
use_attention=True)
"""
dqn_agent.train(env=roundabout_env,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
name="dqn_roundabout")
dqn_attention_agent.train(env=roundabout_env,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
name="dqn_attention_roundabout")
a3c_agent.train(
envs=parallel_roundabout_envs,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
name="a3c_roundabout"
)
a3c_attention_agent.train(
envs=parallel_roundabout_envs,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
name="a3c_attention_roundabout"
)
ppo_agent.train(env=env,
num_epochs=N_EPOCHS,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
train_policy_iterations=100,
name="ppo_roundabout")
ppo_attention_agent.train(env=env,
num_epochs=N_EPOCHS,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
train_policy_iterations=100,
name="ppo_attention_roundabout")
"""
dqn_roundabout_results = pd.read_csv(r"results\dqn_roundabout.csv")
dqn_attention_roundabout_results = pd.read_csv(r"results\dqn_attention_roundabout.csv")
a3c_roundabout_results = pd.read_csv(r"results\a3c_roundabout.csv")
a3c_attention_roundabout_results = pd.read_csv(r"results\a3c_attention_roundabout.csv")
ppo_roundabout_results = pd.read_csv(r"results\ppo_roundabout.csv")
ppo_attention_roundabout_results = pd.read_csv(r"results\ppo_attention_roundabout.csv")
# Training Highway
parallel_highway_envs = [gym.make("highway-v0") for _ in range(8)]
for env in parallel_highway_envs:
env.configure(highway_env_config)
num_actions = highway_env.action_space.n
input_shape = (highway_env.observation_space.shape[0] * highway_env.observation_space.shape[1],)
input_shape_attention = highway_env.observation_space.shape
dqn_agent = DQNAgent(input_shape=input_shape,
num_actions=num_actions)
dqn_attention_agent = DQNAgent(input_shape=input_shape_attention,
num_actions=num_actions,
use_attention=True)
a3c_agent = A3CAgent(
input_shape=input_shape,
num_actions=num_actions,
)
a3c_attention_agent = A3CAgent(
input_shape=input_shape_attention,
num_actions=num_actions,
use_attention=True
)
ppo_agent = PPOAgent(input_shape=input_shape,
buffer_size=MAX_STEPS_MERGE * N_ACTORS,
num_actions=num_actions)
ppo_attention_agent = PPOAgent(input_shape=input_shape_attention,
buffer_size=MAX_STEPS_MERGE * N_ACTORS,
num_actions=num_actions,
use_attention=True)
"""
dqn_agent.train(env=highway_env,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
name="dqn_highway")
dqn_attention_agent.train(env=highway_env,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
name="dqn_attention_highway")
a3c_agent.train(
envs=parallel_highway_envs,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
name="a3c_highway"
)
a3c_attention_agent.train(
envs=parallel_highway_envs,
num_episodes=N_EPISODES,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
name="a3c_attention_highway"
)
ppo_agent.train(env=env,
num_epochs=N_EPOCHS,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
train_policy_iterations=100,
name="ppo_highway")
ppo_attention_agent.train(env=env,
num_epochs=N_EPOCHS,
num_actors=N_ACTORS,
max_steps=MAX_STEPS_ROUNDABOUT,
train_policy_iterations=100,
name="ppo_attention_highway")
"""
dqn_highway_results = pd.read_csv(r"results\dqn_highway.csv")
dqn_attention_highway_results = pd.read_csv(r"results\dqn_attention_highway.csv")
a3c_highway_results = pd.read_csv(r"results\a3c_highway.csv")
a3c_attention_highway_results = pd.read_csv(r"results\a3c_attention_highway.csv")
ppo_highway_results = pd.read_csv(r"results\ppo_highway.csv")
ppo_attention_highway_results = pd.read_csv(r"results\ppo_attention_highway.csv")
def act(actor, observation, attention):
if not attention:
observation = observation.reshape(1, -1)
else:
observation = tf.expand_dims(observation, axis=0)
logits = actor(observation)
logprobs = tf.keras.ops.softmax(logits)
action = np.argmax(logprobs)
return action
def run_agent(actor, env, attention=False, num_episodes=5, max_steps=128, video_save_path='output_video.mp4'):
env.reset()
video_writer = cv2.VideoWriter(video_save_path, cv2.VideoWriter_fourcc(*'X264'), 30, (env.render().shape[1], env.render().shape[0]))
for i in range(num_episodes):
observation, _= env.reset()
done = False
frames = []
for _ in range(max_steps):
frame = env.render()
video_writer.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
action = act(actor, observation, attention)
observation, reward, done, _, info = env.step(action)
if done:
break
video_writer.release()
env.close()
dqn_merge_actor = tf.keras.models.load_model(r"models\dqn_merge.keras")
dqn_highway_actor = tf.keras.models.load_model(r"models\dqn_highway.keras")
dqn_roundabout_actor = tf.keras.models.load_model(r"models\dqn_roundabout.keras")
dqn_merge_attention_actor = tf.keras.models.load_model(r"models\dqn_attention_merge.keras", custom_objects={'EgoAttentionLayer': EgoAttentionLayer})
dqn_highway_attention_actor = tf.keras.models.load_model(r"models\dqn_attention_highway.keras", custom_objects={'EgoAttentionLayer': EgoAttentionLayer})
dqn_roundabout_attention_actor = tf.keras.models.load_model(r"models\dqn_attention_roundabout.keras", custom_objects={'EgoAttentionLayer': EgoAttentionLayer})
a3c_merge_actor = tf.keras.models.load_model(r"models\a3c_merge.keras")
a3c_highway_actor = tf.keras.models.load_model(r"models\a3c_highway.keras")
a3c_roundabout_actor = tf.keras.models.load_model(r"models\a3c_roundabout.keras")
a3c_merge_attention_actor = tf.keras.models.load_model(r"models\a3c_attention_merge.keras", custom_objects={'EgoAttentionLayer': EgoAttentionLayer})
a3c_highway_attention_actor = tf.keras.models.load_model(r"models\a3c_attention_highway.keras", custom_objects={'EgoAttentionLayer': EgoAttentionLayer})
a3c_roundabout_attention_actor = tf.keras.models.load_model(r"models\a3c_attention_roundabout.keras", custom_objects={'EgoAttentionLayer': EgoAttentionLayer})
ppo_merge_actor = tf.keras.models.load_model(r"models\ppo_merge.keras")
ppo_highway_actor = tf.keras.models.load_model(r"models\ppo_highway.keras")
ppo_roundabout_actor = tf.keras.models.load_model(r"models\ppo_roundabout.keras")
ppo_merge_attention_actor = tf.keras.models.load_model(r"models\ppo_attention_merge.keras", custom_objects={'EgoAttentionLayer': EgoAttentionLayer})
ppo_highway_attention_actor = tf.keras.models.load_model(r"models\ppo_attention_highway.keras", custom_objects={'EgoAttentionLayer': EgoAttentionLayer})
ppo_roundabout_attention_actor = tf.keras.models.load_model(r"models\ppo_attention_roundabout.keras", custom_objects={'EgoAttentionLayer': EgoAttentionLayer})
"""
run_agent(actor=dqn_merge_actor, env=merge_env, video_save_path=r"videos\dqn_merge.mp4")
run_agent(actor=dqn_highway_actor, env=highway_env, video_save_path=r"videos\dqn_highway.mp4")
run_agent(actor=dqn_roundabout_actor, env=roundabout_env, max_steps=64, video_save_path=r"videos\dqn_roundabout.mp4")
run_agent(actor=a3c_merge_actor, env=merge_env, video_save_path=r"videos\a3c_merge.mp4")
run_agent(actor=a3c_highway_actor, env=highway_env, video_save_path=r"videos\a3c_highway.mp4")
run_agent(actor=a3c_roundabout_actor, env=roundabout_env, max_steps=64, video_save_path=r"videos\a3c_roundabout.mp4")
run_agent(actor=ppo_merge_actor, env=merge_env, video_save_path=r"videos\ppo_merge.mp4")
run_agent(actor=ppo_highway_actor, env=highway_env, video_save_path=r"videos\ppo_highway.mp4")
run_agent(actor=ppo_roundabout_actor, env=roundabout_env, max_steps=64, video_save_path=r"videos\ppo_roundabout.mp4")
run_agent(actor=dqn_merge_attention_actor, env=merge_env, attention=True, video_save_path=r"videos\dqn_merge_attention.mp4")
run_agent(actor=dqn_highway_attention_actor, env=highway_env, attention=True, video_save_path=r"videos\dqn_highway_attention.mp4")
run_agent(actor=dqn_roundabout_attention_actor, env=roundabout_env, attention=True, max_steps=64, video_save_path=r"videos\dqn_roundabout_attention.mp4")
run_agent(actor=a3c_merge_attention_actor, env=merge_env, attention=True, video_save_path=r"videos\a3c_merge_attention.mp4")
run_agent(actor=a3c_highway_attention_actor, env=highway_env, attention=True, video_save_path=r"videos\a3c_highway_attention.mp4")
run_agent(actor=a3c_roundabout_attention_actor, env=roundabout_env, attention=True, max_steps=64, video_save_path=r"videos\a3c_roundabout_attention.mp4")
run_agent(actor=ppo_merge_attention_actor, env=merge_env, attention=True, video_save_path=r"videos\ppo_merge_attention.mp4")
run_agent(actor=ppo_highway_attention_actor, env=highway_env, attention=True, video_save_path=r"videos\ppo_highway_attention.mp4")
run_agent(actor=ppo_roundabout_attention_actor, env=roundabout_env, attention=True, max_steps=64, video_save_path=r"videos\ppo_roundabout_attention.mp4")
"""
6.2 Evaluation metrics¶
In our project, we employ a set of distinct metrics to evaluate the training process of each model, moving beyond the conventional approach of solely using total rewards (mean returns).
Total Number of Steps per Epoch (Mean Length): We monitor the total number of steps taken in each epoch across different environments. Each environment has a predefined maximum number of steps. For example, in the "merge" environment, this maximum is set based on the number of steps it taken by the ego-vehicle until the merging vehicle successfully integrates into the traffic. In contrast, for the "roundabout" and "highway" environments, we establish episode lengths that adequately reflect typical behavioral patterns (64 and 128 steps, respectively). This metric helps assess the safety of the algorithms; reaching the maximum step count consistently suggests that an agent's policy may have stabilised at a safe behavior.
Average Speed per Episode (Mean Speed): To gauge the efficiency of each policy, we measure the average speed maintained throughout each episode. A higher average speed indicates a more efficient handling of driving tasks, potentially enhancing overall traffic flow and reducing travel times.
Action Distribution Analysis (Action Probabilities): We also analyze the multinomial distributions of the five permissible actions to observe the diversity and evolution of decision-making strategies across models and environments. This analysis helps us determine whether the policy converges towards a balanced mix of actions or becomes skewed towards a dominant action, providing insights into the adaptability and robustness of the driving strategy.
6.3 Numerical results¶
In this section, we evaluate the performance of the algorithms across various environments.
Merge¶
We begin our analysis by illustrating the general behavior of each RL agent below. On the left-hand side, we display the three models operating in the 'merge' environment without an integrated attention-based framework, following the completion of their training. On the right-hand side, we present the corresponding models after integrating the attention mechanism.
class VideoGrid(widgets.VBox):
def __init__(self, video_paths, video_captions):
self.videos = [widgets.Video.from_file(path, controls=True) for path in video_paths]
self.captions = [widgets.Label(value=caption) for caption in video_captions]
rows = []
for i in range(3):
row = []
for j in range(2):
row.append(widgets.VBox(children=[self.videos[i+3*j], self.captions[i+3*j]]))
rows.append(widgets.HBox(children=row))
super().__init__(children=rows)
video_paths = [r"videos\dqn_merge.mp4", r"videos\a3c_merge.mp4", r"videos\ppo_merge.mp4", r"videos\dqn_merge_attention.mp4", r"videos\a3c_merge_attention.mp4", r"videos\ppo_merge_attention.mp4"]
video_captions = ["DQN", "A3C", "PPO", "DQN with attention", "A3C with attention", "PPO with attention"]
video_grid = VideoGrid(video_paths, video_captions)
display(video_grid)
VideoGrid(children=(HBox(children=(VBox(children=(Video(value=b'\x00\x00\x00\x18ftypmp42\x00\x00\x00\x00mp41is…
We proceed with a comparison of evaluation metrics for the models, as illustrated in the plots below. Similar to the videos, the algorithms without attention are presented on the left-hand side, while those with integrated attention are on the right.
# Create subplots
fig = make_subplots(rows=3, cols=2)
episode = np.arange(1,201)
# Add line charts to the subplots
fig.add_trace(go.Scatter(y=dqn_merge_results["returns"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779')), row=1, col=1)
fig.add_trace(go.Scatter(y=dqn_merge_results["lengths"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=dqn_merge_results["speeds"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=dqn_attention_merge_results["returns"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_merge_results["lengths"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_merge_results["speeds"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=a3c_merge_results["returns"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e')), row=1, col=1)
fig.add_trace(go.Scatter(y=a3c_merge_results["lengths"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=a3c_merge_results["speeds"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=a3c_attention_merge_results["returns"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_merge_results["lengths"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_merge_results["speeds"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=ppo_merge_results["returns"], x=episode, mode='lines', name='PPO', line=dict(color='#440154')), row=1, col=1)
fig.add_trace(go.Scatter(y=ppo_merge_results["lengths"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=ppo_merge_results["speeds"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=ppo_attention_merge_results["returns"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_merge_results["lengths"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_merge_results["speeds"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=2)
fig.update_layout(
annotations=[
dict(
text="Without attention", # First subtitle text
xref="x1", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
),
dict(
text="With attention", # Second subtitle text
xref="x2", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
)
]
)
fig.update_layout(
title='Numerical Results for Merge Environment',
xaxis1=dict(gridcolor='gray', title='Episode'),
yaxis1=dict(gridcolor='gray', title='Mean Return'),
xaxis2=dict(gridcolor='gray', title='Episode'),
yaxis2=dict(gridcolor='gray', title='Mean Return'),
xaxis3=dict(gridcolor='gray', title='Episode'),
yaxis3=dict(gridcolor='gray', title='Mean Length'),
xaxis4=dict(gridcolor='gray', title='Episode'),
yaxis4=dict(gridcolor='gray', title='Mean Length'),
xaxis5=dict(gridcolor='gray', title='Episode'),
yaxis5=dict(gridcolor='gray', title='Mean Speed'),
xaxis6=dict(gridcolor='gray', title='Episode'),
yaxis6=dict(gridcolor='gray', title='Mean Speed'),
width=1000,
height=900,
plot_bgcolor='white'
)
# Show plot
fig.show()
The first two plots (top row) present the Mean Return over the training period of 200 epochs. For the models using the MLP architecture (without attention), both A3C and PPO achieve higher total rewards faster than DQN. Although DQN rewards significantly increase after a short period, they consistently remain lower than those of the other two algorithms, with A3C showing the best performance at the final stages of training.
This pattern shifts with the integration of attention. In this scenario, the on-policy models again outperform the off-policy DQN, but PPO demonstrates superior results, surpassing both A3C and DQN.
The second set of plots (middle row) illustrates the Mean Length for the 200 training episodes. Given that the merge environment is goal-based (achieving a specific state with no time limit), the Mean Length should be considered alongside the Mean Speed (bottom row). For the MLP architecture, both DQN and PPO exhibit volatile behavior in terms of the total number of steps, while A3C eventually reaches higher step counts (more steps). However, the Mean Speed plots indicate that DQN and PPO maintain higher average speeds compared to A3C, suggesting that A3C converges to a generally slower, more time-consuming behavior.
In the attention-integrated framework, both DQN and A3C display stable behavior. DQN consistently performs very fast, short runs, while A3C opts for longer, slower runs. PPO, on the other hand, gradually learns a policy that results in longer survival times by adopting lower speeds.
To gain a deeper understanding, we present the action probabilities below:
def parse_list(cell):
cell = cell[1:-1]
values = cell.split(" ")
arr = []
for value in values:
try:
arr.append(float(value))
except:
continue
assert len(arr) == 5
return np.array(arr, dtype=np.float32)
dqn_action_probs = np.transpose(np.vstack(dqn_merge_results["action_probs"].apply(parse_list).values))
dqn_attention_action_probs = np.transpose(np.vstack(dqn_attention_merge_results["action_probs"].apply(parse_list).values))
a3c_action_probs = np.transpose(np.vstack(a3c_merge_results["action_probs"].apply(parse_list).values))
a3c_attention_action_probs = np.transpose(np.vstack(a3c_attention_merge_results["action_probs"].apply(parse_list).values))
ppo_action_probs = np.transpose(np.vstack(ppo_merge_results["action_probs"].apply(parse_list).values))
ppo_attention_action_probs = np.transpose(np.vstack(ppo_attention_merge_results["action_probs"].apply(parse_list).values))
# Create subplots
fig = make_subplots(rows=5, cols=2)
episode = np.arange(1,201)
# Add line charts to the subplots
fig.add_trace(go.Scatter(y=dqn_action_probs[0], x=episode, mode='lines', name='DQN', line=dict(color='#35b779')), row=1, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[1], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[2], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[3], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=4, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[4], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=5, col=1)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[0], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[1], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[2], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[3], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=4, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[4], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=5, col=2)
fig.add_trace(go.Scatter(y=a3c_action_probs[0], x=episode, mode='lines', name='A3C', line=dict(color='#31688e')), row=1, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[1], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[2], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[3], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=4, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[4], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=5, col=1)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[0], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[1], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[2], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[3], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=4, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[4], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=5, col=2)
fig.add_trace(go.Scatter(y=ppo_action_probs[0], x=episode, mode='lines', name='PPO', line=dict(color='#440154')), row=1, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[1], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[2], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[3], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=4, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[4], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=5, col=1)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[0], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[1], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[2], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[3], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=4, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[4], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=5, col=2)
fig.update_layout(
annotations=[
dict(
text="Without attention", # First subtitle text
xref="x1", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
),
dict(
text="With attention", # Second subtitle text
xref="x2", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
)
]
)
fig.update_layout(
title='Action Probabilities for Merge Environment',
xaxis1=dict(gridcolor='gray', title='Episode'),
yaxis1=dict(gridcolor='gray', title='Lane Left', range=[0, 1]),
xaxis2=dict(gridcolor='gray', title='Episode'),
yaxis2=dict(gridcolor='gray', title='Lane Left', range=[0, 1]),
xaxis3=dict(gridcolor='gray', title='Episode'),
yaxis3=dict(gridcolor='gray', title='Idle', range=[0, 1]),
xaxis4=dict(gridcolor='gray', title='Episode'),
yaxis4=dict(gridcolor='gray', title='Idle', range=[0, 1]),
xaxis5=dict(gridcolor='gray', title='Episode'),
yaxis5=dict(gridcolor='gray', title='Lane Right', range=[0, 1]),
xaxis6=dict(gridcolor='gray', title='Episode'),
yaxis6=dict(gridcolor='gray', title='Lane Right', range=[0, 1]),
xaxis7=dict(gridcolor='gray', title='Episode'),
yaxis7=dict(gridcolor='gray', title='Faster', range=[0, 1]),
xaxis8=dict(gridcolor='gray', title='Episode'),
yaxis8=dict(gridcolor='gray', title='Faster', range=[0, 1]),
xaxis9=dict(gridcolor='gray', title='Episode'),
yaxis9=dict(gridcolor='gray', title='Slower', range=[0, 1]),
xaxis10=dict(gridcolor='gray', title='Episode'),
yaxis10=dict(gridcolor='gray', title='Slower', range=[0, 1]),
width=1000,
height=900,
plot_bgcolor='white'
)
# Show plot
fig.show()
The insights from the action probabilities analysis offer a different perspective. In the MLP case, DQN's policy remains relatively diverse, with accelerating and turning right being the more dominant actions. The on-policy algorithms, however, converge to policies that prioritize specific actions. For instance, A3C eventually adopts a strategy heavily involving deceleration, while PPO tends to prioritize turning left, deceleration, or maintaining a constant speed. This indicates that A3C and PPO adopt generally safer approaches to the task.
In the attention-integrated versions, DQN immediately converges to a suboptimal policy of turning right. A3C, on the other hand, prioritizes speeding up. Considering the Mean Length and Mean Speed plots, this suggests that episodes are characterized by gradually increasing speed but not achieving it quickly enough, resulting in long but not particularly fast episodes. Both algorithms converge to very risky strategies. Conversely, PPO demonstrates a balanced behavior involving both accelerating and decelerating, which in the "merge" environment, could be characterized as optimal performance.
Highway¶
Next, we review the performance of the algorithms in the "highway" environment. The videos presented below follow the same structure as those for the "merge" environment.
video_paths = [r"videos\dqn_highway.mp4", r"videos\a3c_highway.mp4", r"videos\ppo_highway.mp4", r"videos\dqn_highway_attention.mp4", r"videos\a3c_highway_attention.mp4", r"videos\ppo_highway_attention.mp4"]
video_captions = ["DQN", "A3C", "PPO", "DQN with attention", "A3C with attention", "PPO with attention"]
video_grid = VideoGrid(video_paths, video_captions)
display(video_grid)
VideoGrid(children=(HBox(children=(VBox(children=(Video(value=b'\x00\x00\x00\x18ftypmp42\x00\x00\x00\x00mp41is…
# Create subplots
fig = make_subplots(rows=3, cols=2)
episode = np.arange(1,201)
# Add line charts to the subplots
fig.add_trace(go.Scatter(y=dqn_highway_results["returns"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779')), row=1, col=1)
fig.add_trace(go.Scatter(y=dqn_highway_results["lengths"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=dqn_highway_results["speeds"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=dqn_attention_highway_results["returns"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_highway_results["lengths"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_highway_results["speeds"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=a3c_highway_results["returns"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e')), row=1, col=1)
fig.add_trace(go.Scatter(y=a3c_highway_results["lengths"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=a3c_highway_results["speeds"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=a3c_attention_highway_results["returns"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_highway_results["lengths"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_highway_results["speeds"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=ppo_highway_results["returns"], x=episode, mode='lines', name='PPO', line=dict(color='#440154')), row=1, col=1)
fig.add_trace(go.Scatter(y=ppo_highway_results["lengths"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=ppo_highway_results["speeds"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=ppo_attention_highway_results["returns"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_highway_results["lengths"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_highway_results["speeds"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=2)
fig.update_layout(
annotations=[
dict(
text="Without attention", # First subtitle text
xref="x1", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
),
dict(
text="With attention", # Second subtitle text
xref="x2", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
)
]
)
fig.update_layout(
title='Numerical Results for Highway Environment',
xaxis1=dict(gridcolor='gray', title='Episode'),
yaxis1=dict(gridcolor='gray', title='Mean Return'),
xaxis2=dict(gridcolor='gray', title='Episode'),
yaxis2=dict(gridcolor='gray', title='Mean Return'),
xaxis3=dict(gridcolor='gray', title='Episode'),
yaxis3=dict(gridcolor='gray', title='Mean Length'),
xaxis4=dict(gridcolor='gray', title='Episode'),
yaxis4=dict(gridcolor='gray', title='Mean Length'),
xaxis5=dict(gridcolor='gray', title='Episode'),
yaxis5=dict(gridcolor='gray', title='Mean Speed'),
xaxis6=dict(gridcolor='gray', title='Episode'),
yaxis6=dict(gridcolor='gray', title='Mean Speed'),
width=1000,
height=900,
plot_bgcolor='white'
)
# Show plot
fig.show()
For the models without attention integrated, the total rewards are initially similar across the board. However, in the final stages of the training process, PPO outperforms the other two models, with A3C showing the worst results. In contrast, in the attention-based case, DQN exhibits better overall performance than both on-policy agents.
When reviewing the Mean Length, we observe that all models perform similarly, with the off-policy model being slightly worse in the MLP architecture case. On the other hand, PPO with attention performs quite poorly compared to the other two models, although it experiences a gradual improvement. Notably, only A3C achieves the maximum number of steps for both state representation frameworks.
The Mean Speed shows that the average speed for DQN and PPO is significantly higher than A3C's when there is no attention framework integrated. A similar trend is observed in the attention-integrated case, with PPO outperforming DQN. These metrics indicate that A3C converges to a safer but slower behavior, which undermines efficiency. DQN converges to a suboptimal policy, while PPO manages to balance safety and efficiency more effectively.
dqn_action_probs = np.transpose(np.vstack(dqn_highway_results["action_probs"].apply(parse_list).values))
dqn_attention_action_probs = np.transpose(np.vstack(dqn_attention_highway_results["action_probs"].apply(parse_list).values))
a3c_action_probs = np.transpose(np.vstack(a3c_highway_results["action_probs"].apply(parse_list).values))
a3c_attention_action_probs = np.transpose(np.vstack(a3c_attention_highway_results["action_probs"].apply(parse_list).values))
ppo_action_probs = np.transpose(np.vstack(ppo_highway_results["action_probs"].apply(parse_list).values))
ppo_attention_action_probs = np.transpose(np.vstack(ppo_attention_highway_results["action_probs"].apply(parse_list).values))
# Create subplots
fig = make_subplots(rows=5, cols=2)
episode = np.arange(1,201)
# Add line charts to the subplots
fig.add_trace(go.Scatter(y=dqn_action_probs[0], x=episode, mode='lines', name='DQN', line=dict(color='#35b779')), row=1, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[1], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[2], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[3], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=4, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[4], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=5, col=1)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[0], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[1], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[2], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[3], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=4, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[4], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=5, col=2)
fig.add_trace(go.Scatter(y=a3c_action_probs[0], x=episode, mode='lines', name='A3C', line=dict(color='#31688e')), row=1, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[1], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[2], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[3], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=4, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[4], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=5, col=1)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[0], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[1], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[2], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[3], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=4, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[4], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=5, col=2)
fig.add_trace(go.Scatter(y=ppo_action_probs[0], x=episode, mode='lines', name='PPO', line=dict(color='#440154')), row=1, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[1], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[2], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[3], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=4, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[4], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=5, col=1)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[0], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[1], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[2], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[3], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=4, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[4], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=5, col=2)
fig.update_layout(
annotations=[
dict(
text="Without attention", # First subtitle text
xref="x1", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
),
dict(
text="With attention", # Second subtitle text
xref="x2", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
)
]
)
fig.update_layout(
title='Action Probabilities for Highway Environment',
xaxis1=dict(gridcolor='gray', title='Episode'),
yaxis1=dict(gridcolor='gray', title='Lane Left', range=[0, 1]),
xaxis2=dict(gridcolor='gray', title='Episode'),
yaxis2=dict(gridcolor='gray', title='Lane Left', range=[0, 1]),
xaxis3=dict(gridcolor='gray', title='Episode'),
yaxis3=dict(gridcolor='gray', title='Idle', range=[0, 1]),
xaxis4=dict(gridcolor='gray', title='Episode'),
yaxis4=dict(gridcolor='gray', title='Idle', range=[0, 1]),
xaxis5=dict(gridcolor='gray', title='Episode'),
yaxis5=dict(gridcolor='gray', title='Lane Right', range=[0, 1]),
xaxis6=dict(gridcolor='gray', title='Episode'),
yaxis6=dict(gridcolor='gray', title='Lane Right', range=[0, 1]),
xaxis7=dict(gridcolor='gray', title='Episode'),
yaxis7=dict(gridcolor='gray', title='Faster', range=[0, 1]),
xaxis8=dict(gridcolor='gray', title='Episode'),
yaxis8=dict(gridcolor='gray', title='Faster', range=[0, 1]),
xaxis9=dict(gridcolor='gray', title='Episode'),
yaxis9=dict(gridcolor='gray', title='Slower', range=[0, 1]),
xaxis10=dict(gridcolor='gray', title='Episode'),
yaxis10=dict(gridcolor='gray', title='Slower', range=[0, 1]),
width=1000,
height=900,
plot_bgcolor='white'
)
# Show plot
fig.show()
Regarding the action probability analysis, both DQN and PPO exhibit a diverse distribution of chosen actions when using the MLP framework. However, DQN's policy converges to actions involving lower speeds than PPO, confirming our previous findings. On the other hand, A3C's predominant action becomes turning left, which, given our earlier observations, indicates that the agent's algorithm is defined by keeping to the left-most lane at a very low speed, a suboptimal strategy.
The scenario changes significantly with the incorporation of the attention framework. DQN quickly converges to maintaining a constant speed, which in most scenarios results in a crash. Initially, A3C prioritizes slow speeds but then converges to a policy that keeps the agent at a safe distance from other cars at a constant speed, leading to a suboptimal policy. Due to these suboptimal policies, both models occasionally lead the agents to collisions. In contrast, PPO exhibits a more balanced behavior, prioritizing staying in the right lane and accelerating. This model demonstrates optimal behavior for the highway environment as it maintains the right lane, uses a diverse portfolio of actions when necessary, and prioritizes both safety and efficiency.
Roundabout¶
Finally, we present our findings from the training in the "roundabout" environment. The corresponding videos are provided below:
video_paths = [r"videos\dqn_roundabout.mp4", r"videos\a3c_roundabout.mp4", r"videos\ppo_roundabout.mp4", r"videos\dqn_roundabout_attention.mp4", r"videos\a3c_roundabout_attention.mp4", r"videos\ppo_roundabout_attention.mp4"]
video_captions = ["DQN", "A3C", "PPO", "DQN with attention", "A3C with attention", "PPO with attention"]
video_grid = VideoGrid(video_paths, video_captions)
display(video_grid)
VideoGrid(children=(HBox(children=(VBox(children=(Video(value=b'\x00\x00\x00\x18ftypmp42\x00\x00\x00\x00mp41is…
# Create subplots
fig = make_subplots(rows=3, cols=2)
episode = np.arange(1,201)
# Add line charts to the subplots
fig.add_trace(go.Scatter(y=dqn_roundabout_results["returns"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779')), row=1, col=1)
fig.add_trace(go.Scatter(y=dqn_roundabout_results["lengths"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=dqn_roundabout_results["speeds"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=dqn_attention_roundabout_results["returns"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_roundabout_results["lengths"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_roundabout_results["speeds"], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=a3c_roundabout_results["returns"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e')), row=1, col=1)
fig.add_trace(go.Scatter(y=a3c_roundabout_results["lengths"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=a3c_roundabout_results["speeds"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=a3c_attention_roundabout_results["returns"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_roundabout_results["lengths"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_roundabout_results["speeds"], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=ppo_roundabout_results["returns"], x=episode, mode='lines', name='PPO', line=dict(color='#440154')), row=1, col=1)
fig.add_trace(go.Scatter(y=ppo_roundabout_results["lengths"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=ppo_roundabout_results["speeds"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=ppo_attention_roundabout_results["returns"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_roundabout_results["lengths"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_roundabout_results["speeds"], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=2)
fig.update_layout(
annotations=[
dict(
text="Without attention", # First subtitle text
xref="x1", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
),
dict(
text="With attention", # Second subtitle text
xref="x2", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
)
]
)
fig.update_layout(
title='Numerical Results for Roundabout Environment',
xaxis1=dict(gridcolor='gray', title='Episode'),
yaxis1=dict(gridcolor='gray', title='Mean Return'),
xaxis2=dict(gridcolor='gray', title='Episode'),
yaxis2=dict(gridcolor='gray', title='Mean Return'),
xaxis3=dict(gridcolor='gray', title='Episode'),
yaxis3=dict(gridcolor='gray', title='Mean Length'),
xaxis4=dict(gridcolor='gray', title='Episode'),
yaxis4=dict(gridcolor='gray', title='Mean Length'),
xaxis5=dict(gridcolor='gray', title='Episode'),
yaxis5=dict(gridcolor='gray', title='Mean Speed'),
xaxis6=dict(gridcolor='gray', title='Episode'),
yaxis6=dict(gridcolor='gray', title='Mean Speed'),
width=1000,
height=900,
plot_bgcolor='white'
)
# Show plot
fig.show()
The Total Return plots indicate that, overall, DQN and PPO outperform A3C for both neural network representations. However, in the attention-based frameworks, PPO surpasses DQN in performance.
The Mean Length for all algorithms fluctuates significantly, but the policy-based ones (PPO and A3C) exhibit slightly better results compared to the value-based DQN.
The scenario is more complex with Mean Speed. With the MLP framework, DQN and PPO achieve higher average speeds than A3C, with DQN being slightly faster than PPO. All three models exhibit fluctuating speeds. However, with the attention-based framework, both PPO and DQN maintain consistently high speeds, although DQN's speed is slightly lower than PPO's. The A3C agent is generally slower than the other two algorithms, though it occasionally reaches peaks that surpass the average speeds of both PPO and DQN.
dqn_action_probs = np.transpose(np.vstack(dqn_roundabout_results["action_probs"].apply(parse_list).values))
dqn_attention_action_probs = np.transpose(np.vstack(dqn_attention_roundabout_results["action_probs"].apply(parse_list).values))
a3c_action_probs = np.transpose(np.vstack(a3c_roundabout_results["action_probs"].apply(parse_list).values))
a3c_attention_action_probs = np.transpose(np.vstack(a3c_attention_roundabout_results["action_probs"].apply(parse_list).values))
ppo_action_probs = np.transpose(np.vstack(ppo_roundabout_results["action_probs"].apply(parse_list).values))
ppo_attention_action_probs = np.transpose(np.vstack(ppo_attention_roundabout_results["action_probs"].apply(parse_list).values))
# Create subplots
fig = make_subplots(rows=5, cols=2)
episode = np.arange(1,201)
# Add line charts to the subplots
fig.add_trace(go.Scatter(y=dqn_action_probs[0], x=episode, mode='lines', name='DQN', line=dict(color='#35b779')), row=1, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[1], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[2], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[3], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=4, col=1)
fig.add_trace(go.Scatter(y=dqn_action_probs[4], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=5, col=1)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[0], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[1], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[2], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[3], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=4, col=2)
fig.add_trace(go.Scatter(y=dqn_attention_action_probs[4], x=episode, mode='lines', name='DQN', line=dict(color='#35b779'), showlegend=False), row=5, col=2)
fig.add_trace(go.Scatter(y=a3c_action_probs[0], x=episode, mode='lines', name='A3C', line=dict(color='#31688e')), row=1, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[1], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[2], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[3], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=4, col=1)
fig.add_trace(go.Scatter(y=a3c_action_probs[4], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=5, col=1)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[0], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[1], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[2], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[3], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=4, col=2)
fig.add_trace(go.Scatter(y=a3c_attention_action_probs[4], x=episode, mode='lines', name='A3C', line=dict(color='#31688e'), showlegend=False), row=5, col=2)
fig.add_trace(go.Scatter(y=ppo_action_probs[0], x=episode, mode='lines', name='PPO', line=dict(color='#440154')), row=1, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[1], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[2], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[3], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=4, col=1)
fig.add_trace(go.Scatter(y=ppo_action_probs[4], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=5, col=1)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[0], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[1], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=2, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[2], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=3, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[3], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=4, col=2)
fig.add_trace(go.Scatter(y=ppo_attention_action_probs[4], x=episode, mode='lines', name='PPO', line=dict(color='#440154'), showlegend=False), row=5, col=2)
fig.update_layout(
annotations=[
dict(
text="Without attention", # First subtitle text
xref="x1", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
),
dict(
text="With attention", # Second subtitle text
xref="x2", # Reference for x-coordinate (change according to your subplot)
yref="paper", # Reference for y-coordinate (paper means relative to the entire plot)
x=1, # Positioning the subtitle at the center of the subplot
y=1.05, # Positioning the subtitle below the subplot
showarrow=False, # No arrow
font=dict(size=14) # Font size of the subtitle
)
]
)
fig.update_layout(
title='Action Probabilities for Highway Environment',
xaxis1=dict(gridcolor='gray', title='Episode'),
yaxis1=dict(gridcolor='gray', title='Lane Left', range=[0, 1]),
xaxis2=dict(gridcolor='gray', title='Episode'),
yaxis2=dict(gridcolor='gray', title='Lane Left', range=[0, 1]),
xaxis3=dict(gridcolor='gray', title='Episode'),
yaxis3=dict(gridcolor='gray', title='Idle', range=[0, 1]),
xaxis4=dict(gridcolor='gray', title='Episode'),
yaxis4=dict(gridcolor='gray', title='Idle', range=[0, 1]),
xaxis5=dict(gridcolor='gray', title='Episode'),
yaxis5=dict(gridcolor='gray', title='Lane Right', range=[0, 1]),
xaxis6=dict(gridcolor='gray', title='Episode'),
yaxis6=dict(gridcolor='gray', title='Lane Right', range=[0, 1]),
xaxis7=dict(gridcolor='gray', title='Episode'),
yaxis7=dict(gridcolor='gray', title='Faster', range=[0, 1]),
xaxis8=dict(gridcolor='gray', title='Episode'),
yaxis8=dict(gridcolor='gray', title='Faster', range=[0, 1]),
xaxis9=dict(gridcolor='gray', title='Episode'),
yaxis9=dict(gridcolor='gray', title='Slower', range=[0, 1]),
xaxis10=dict(gridcolor='gray', title='Episode'),
yaxis10=dict(gridcolor='gray', title='Slower', range=[0, 1]),
width=1000,
height=900,
plot_bgcolor='white'
)
# Show plot
fig.show()
The action probabilities analysis provides further insight into each agent's policy. With the MLP architecture, both DQN and PPO exhibit somewhat diverse behaviors, with turning right being the dominant action. In the context of the "roundabout" environment, this behavior is relatively optimal, as staying in the right lane is generally a good strategy. A3C, however, converges to a policy that involves keeping to the left lane and decelerating.
The results differ significantly with the attention-based architecture. DQN immediately converges to a policy that involves only staying in the left lane, which leads to collisions as other vehicles are not adequately considered in the decision-making process. A3C prioritizes accelerating as a primary action, a very risky strategy. Conversely, PPO demonstrates a balanced action distribution, indicating it uses a diverse set of actions to adapt to the situation as effectively as possible.
7. Conclusion¶
In this project we applied three deep reinforcement learning algorithms — DQN, A3C, and PPO — in an autonomous driving environment. Their performance was evaluated across three distinct driving tasks: "merge", "highway", and "roundabout". Using four evaluation metrics — total rewards, number of steps per episode, mean speed per episode, and action probability — we found that DQN generally does not converge to a viable safe policy. A3C converges too quickly to a suboptimal policy that lacks efficiency. PPO, while more balanced, still fails to achieve optimal performance in most cases.
We further implemented an attention-based framework for the neural network architecture, which significantly improved the policy of the PPO algorithm. This enhancement allowed PPO to converge to an optimal policy that considers both safety and efficiency. For DQN and A3C, the impact of the attention framework was inconclusive.
Several limitations of our project should be noted. First, traffic restrictions were not considered in the simulated environment, reducing the applicability of the models to real-world scenarios. Second, the reward structures did not account for passenger comfort, as safety and efficiency were prioritized. This led to some agents performing unnecessary lane changes, which could cause passenger distress and increase collision danger. This issue might be mitigated with more extensive training, which was constrained by limited computational resources.
Future work could involve utilizing an ensemble approach by combining the multiple state representation forms provided by "HighwayEnv" to enhance the agents' decision-making capabilities. Additionally, integrating traffic control systems through multi-agent collaboration could help agents communicate and work together to develop optimal policies to reduce collisions and traffic congestion.
8. References¶
[1] National Highway Traffic Safety Administration. (2018). Critical Reasons for Crashes Investigated in the National Motor Vehicle Crash Causation Survey. In: https://crashstats.nhtsa.dot.gov/Api/Public/ViewPublication/812506
[2] Oi, M. (2024, April 9). Advances in Autonomous Vehicle Technology. BBC News. Available at: https://www.bbc.co.uk/news/articles/c14kggkr4vro
[3] Metz, R. (2016, February 29). Google's Self-Driving Car Probably Caused Its First Accident. MIT Technology Review. Available at: https://www.technologyreview.com/2016/02/29/161816/googles-self-driving-car-probably-caused-its-first-accident/
[4] Leurent, E. (2018). An environment for autonomous driving decision-making. In: https://github.com/eleurent/highway-env
[5] Leurent, E., & Mercat, J. (2019). Social Attention for Autonomous Decision-Making in Dense Traffic. In: https://eleurent.github.io/social-attention/
[6] Sutton, R. S., & Barto, A. G. (2018). Reinforcement Learning: An Introduction (2nd ed.). In: http://incompleteideas.net/book/RLbook2020.pdf
[7] Schulman, J., Wolski, F., Dhariwal, P., Radford, A., & Klimov, O. (2017). Proximal Policy Optimization Algorithms. In: https://arxiv.org/abs/1707.06347
[8] Wolf, P., Hubschneider, C., Weber, M., & Zollner, J. (2017). Learning How to Drive in a Real World Simulation with Deep Q-Networks. Retrieved from ResearchGate
[9] El Sallab, A., Abdou, M., Perot, E., & Yogamani, S. (2017). Deep Reinforcement Learning Framework for Autonomous Driving. Valeo Egypt, Cairo; Valeo Bobigny, France; Valeo Vision Systems, Ireland. Retrieved from arXiv.
[10] Mnih, V., Kavukcuoglu, K., Silver, D., Rusu, A. A., Veness, J., Bellemare, M. G., Graves, A., Riedmiller, M., Fidjeland, A. K., Ostrovski, G., et al. (2015). Human-level control through deep reinforcement learning. Nature. Retrieved from IEEE Xplore
[11] Chen, J., Yuan, B., & Tomizuka, M. (2019). Model-free Deep Reinforcement Learning for Urban Autonomous Driving. IEEE. Retrieved from IEEE Xplore.
[12] Chen, I.-M., & Chan, C.-Y. (2020). Deep reinforcement learning based path tracking controller for autonomous vehicle. Retrieved from SAGE Journals.
[13] Volodymyr Mnih and Adrià Puigdomènech Badia and Mehdi Mirza and Alex Graves and Timothy P. Lillicrap and Tim Harley and David Silver and Koray Kavukcuoglu (2016). Asynchronous Methods for Deep Reinforcement Learning. In : https://arxiv.org/pdf/1602.01783
[14] J. Wang, Q. Zhang and D. Zhao. (2022). Highway Lane Change Decision-Making via Attention-Based Deep Reinforcement Learning. in IEEE/CAA Journal of Automatica Sinica, vol. 9, no. 3, pp. 567-569, doi: 10.1109/JAS.2021.1004395. In : https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=9664628
Appendix A: Monte Carlo Tree Search¶
For this project, we employed a discrete version of the environment known as "TimeToCollision". Each observation is represented by three grids, each measuring 3 by 10. These grids denote three distinct speeds of the ego-vehicle across three lanes—the lane of the ego-vehicle and its immediate neighbouring—and 10 discretized time bins, with 1-second intervals. In this representation, a value of 1 in any cell indicates a predicted time-to-collision at that time step, assuming that other vehicles do not change lanes or speeds. This simplification ensures that the model focuses only on essential dynamics, reducing computational complexity.
Nevertheless, the state space remains large at $2^{90}$, making the use of the standard tabular methods highly inefficient. Consequently, we opt for Monte Carlo Tree Search (MCTS) rather than traditional Monte Carlo methods, which require the storage of all states and actions—an impractical approach for environments with such extensive and complex state spaces. In contrast, MCTS strategically navigates the state space by selectively expanding potential future states from the current scenario. This targeted exploration not only improves computational efficiency but also enhances the quality of decision-making by focusing on more consequential states.
MCTS operates through several phases starting at the root node, which symbolizes the current state of the simulation:
- Selection: Nodes are selected based on a strategy that balances between exploiting well-performing paths and exploring less frequented ones. The selection process is guided by a slightly modified version of the Upper Confidence Bound (UCB1) formula, which is calculated as:
$$ \text{UCB1} = Q_j + \sqrt{\frac{100 \cdot \log\left(\sum_{i} N_i + 1\right)}{N_j + 1 \times 10^{-10}}} $$
where $Q_j$ corresponds to total expected reward for taking action $j$, $N_j$ corresponds the count of how many times each action has been taken and $\sum_{i} N_i$ is the total count of all actions taken from this state.
Expansion: When a leaf node is reached that does not terminate the driving simulation, more new child nodes are added, expanding the search tree to include new, unexplored actions.
Simulation: From the newly added nodes, the algorithm simulates the outcomes of actions using a random policy to advance the simulation to a terminal state.
Backpropagation: Once a terminal state is reached, the results (total rewards) are propagated back through the path visited in the tree, updating each node's statistics (total expected rewards and visit count) to reflect new data.
This structured approach ensures that each decision point in the tree is updated with the most comprehensive and current information, allowing for more informed and accurate decision-making in complex environments. For more information on the algorithm, please refer to [ Browne, C. B., Powley, E., Whitehouse, D., Lucas, S. M., Cowling, P. I., Rohlfshagen, P., Tavener, S., Perez, D., Samothrakis, S., & Colton, S. (2012). A Survey of Monte Carlo Tree Search Methods. IEEE Transactions on Computational Intelligence and AI in Games, 4(1). Retrieved from IEEE Xplore]
We opt to implement this reinforcement learning method exclusively in the standard "merge" environment. Due to the high computational demands and suboptimal performance, we decide against extending our tests to other environments. Detailed code for the algorithm and its training procedures is provided below.
"""
env5 = gym.make("merge-v0", render_mode=None)
env5.configure({
"observation":{
"type":"TimeToCollision"
},
"action": {
"type": "DiscreteMetaAction"
},
"policy_frequency":8,
"simulation_frequency":16,
"normalize_reward": True
})
import copy
from tqdm import tqdm
class MCTSagent:
def __init__(self, env, iter=100, Q_values=None, count=None):
self.env_backup = copy.deepcopy(env)
self.iter = iter
self.Q_values = Q_values if Q_values is not None else {}
self.count = count if count is not None else {}
# Rollout function
def rollout(self, depth):
total_reward = 0
env_local = copy.deepcopy(self.env_backup)
for i in range(depth):
action = env_local.action_space.sample()
obs, reward, done, truncated, info = env_local.step(action)
total_reward += reward
if done:
break
return total_reward
def select_action(self, state, depth, train=True):
if train:
self.reset_env()
for i in range(self.iter):
self.simulate(state, depth)
state = self.to_tuple_of_tuples(state)
state_q_value = self.Q_values.get(state, None)
if state_q_value is None:
return self.env_backup.action_space.sample()
action = np.argmax(state_q_value)
return action
def simulate(self, state, depth):
if depth == 0:
return 0
state = self.to_tuple_of_tuples(state)
if state not in self.Q_values.keys():
self.Q_values[state] = [0] * self.env_backup.action_space.n
self.count[state] = [0] * self.env_backup.action_space.n
q = self.rollout(depth)
N_vals = np.array(self.count[state])
Q_vals= np.array(self.Q_values[state])
action = np.argmax(Q_vals + np.sqrt(100 * np.log(sum(N_vals) + 1) / (N_vals + 1e-10)))
obs, reward, done, truncated, info = self.env_backup.step(action)
q = reward + self.simulate(obs, depth - 1) if not done else reward
self.count[state][action] += 1
self.Q_values[state][action] = (self.Q_values[state][action] * (self.count[state][action] - 1) + q) / self.count[state][action]
return q
def reset_env(self):
self.env_backup.reset()
def to_tuple_of_tuples(self, state):
return tuple(tuple(tuple(x) for x in subarray) for subarray in state)
mcts_agent = MCTSagent(env, iter=100)
# Training parameters
epochs = 250
depth = 10
total_steps = []
total_rewards = []
total_speeds = []
action_probs = []
for epoch in tqdm(range(epochs)):
obs, info = env.reset()
done = truncated = False
rewards = 0
steps = 0
speeds = []
action_counts = np.zeros(5)
while not done and steps < 1000:
action = mcts_agent.select_action(obs, depth, train=True)
new_obs, reward, done, truncated, info = env.step(action)
obs = new_obs
rewards += reward
steps += 1
speed = info['speed']
speeds.append(speed)
action_count = np.zeros(5)
action_count[action] = 1
action_counts += action_count
print(f"Epoch {epoch+1}/{epochs} completed.")
print(f"Rewards from episode {epoch+1}: {rewards}")
total_rewards.append(rewards)
total_steps.append(steps)
total_speeds.append(np.mean(speeds))
action_probs.append(action_counts/np.sum(action_counts))
df = pd.DataFrame({
'Total Steps': total_steps,
'Total Rewards': total_rewards,
'Total Speeds': total_speeds,
'Action Probabilities': action_probs
})
df.to_csv('mcts_merge.csv')
"""
Appendix B: Random Merge Environment¶
A modification of the merge environment from the highway-env package. An extra config parameter has been added called random-spawn which determines whether the lane and location where each non-ego vehicle spawns is randomized. In order to run this version of the merge environment, please uncomment and copy paste the context of the cell below into ~\highway_env\envs\merge_env.py. If you are unsure of the location of this file, please run "pip uninstall highway-env" and the path where the package has been installed will appear. Press "n" to cancel the uninstallation. Once the file has been modified and saved, run ~\highway_env\init.py. You should now be able to run the random merge environment by passing {"random_spawn":True} into env.configure().
# from typing import Dict, Text
# import numpy as np
# from highway_env import utils
# from highway_env.envs.common.abstract import AbstractEnv
# from highway_env.road.lane import LineType, StraightLane, SineLane
# from highway_env.road.road import Road, RoadNetwork
# from highway_env.vehicle.controller import ControlledVehicle
# from highway_env.vehicle.objects import Obstacle
# class MergeEnv(AbstractEnv):
# """
# A highway merge negotiation environment.
# The ego-vehicle is driving on a highway and approached a merge, with some vehicles incoming on the access ramp.
# It is rewarded for maintaining a high speed and avoiding collisions, but also making room for merging
# vehicles.
# """
# @classmethod
# def default_config(cls) -> dict:
# cfg = super().default_config()
# print("modified config")
# cfg.update({
# "collision_reward": -1,
# "right_lane_reward": 0.1,
# "high_speed_reward": 0.2,
# "merging_speed_reward": -0.5,
# "lane_change_reward": -0.05,
# "random_spawn" : True # Vehicles spawn in random locations and at random speeds
# })
# return cfg
# def _reward(self, action: int) -> float:
# """
# The vehicle is rewarded for driving with high speed on lanes to the right and avoiding collisions
# But an additional altruistic penalty is also suffered if any vehicle on the merging lane has a low speed.
# :param action: the action performed
# :return: the reward of the state-action transition
# """
# reward = sum(self.config.get(name, 0) * reward for name, reward in self._rewards(action).items())
# return utils.lmap(reward,
# [self.config["collision_reward"] + self.config["merging_speed_reward"],
# self.config["high_speed_reward"] + self.config["right_lane_reward"]],
# [0, 1])
# def _rewards(self, action: int) -> Dict[Text, float]:
# return {
# "collision_reward": self.vehicle.crashed,
# "right_lane_reward": self.vehicle.lane_index[2] / 1,
# "high_speed_reward": self.vehicle.speed_index / (self.vehicle.target_speeds.size - 1),
# "lane_change_reward": action in [0, 2],
# "merging_speed_reward": sum( # Altruistic penalty
# (vehicle.target_speed - vehicle.speed) / vehicle.target_speed
# for vehicle in self.road.vehicles
# if vehicle.lane_index == ("b", "c", 2) and isinstance(vehicle, ControlledVehicle)
# )
# }
# def _is_terminated(self) -> bool:
# """The episode is over when a collision occurs or when the access ramp has been passed."""
# return self.vehicle.crashed or bool(self.vehicle.position[0] > 370)
# def _is_truncated(self) -> bool:
# return False
# def _reset(self) -> None:
# self._make_road()
# self._make_vehicles()
# def _make_road(self) -> None:
# """
# Make a road composed of a straight highway and a merging lane.
# :return: the road
# """
# net = RoadNetwork()
# # Highway lanes
# ends = [150, 80, 80, 150] # Before, converging, merge, after
# c, s, n = LineType.CONTINUOUS_LINE, LineType.STRIPED, LineType.NONE
# y = [0, StraightLane.DEFAULT_WIDTH]
# line_type = [[c, s], [n, c]]
# line_type_merge = [[c, s], [n, s]]
# for i in range(2):
# net.add_lane("a", "b", StraightLane([0, y[i]], [sum(ends[:2]), y[i]], line_types=line_type[i]))
# net.add_lane("b", "c", StraightLane([sum(ends[:2]), y[i]], [sum(ends[:3]), y[i]], line_types=line_type_merge[i]))
# net.add_lane("c", "d", StraightLane([sum(ends[:3]), y[i]], [sum(ends), y[i]], line_types=line_type[i]))
# # Merging lane
# amplitude = 3.25
# ljk = StraightLane([0, 6.5 + 4 + 4], [ends[0], 6.5 + 4 + 4], line_types=[c, c], forbidden=True)
# lkb = SineLane(ljk.position(ends[0], -amplitude), ljk.position(sum(ends[:2]), -amplitude),
# amplitude, 2 * np.pi / (2*ends[1]), np.pi / 2, line_types=[c, c], forbidden=True)
# lbc = StraightLane(lkb.position(ends[1], 0), lkb.position(ends[1], 0) + [ends[2], 0],
# line_types=[n, c], forbidden=True)
# net.add_lane("j", "k", ljk)
# net.add_lane("k", "b", lkb)
# net.add_lane("b", "c", lbc)
# road = Road(network=net, np_random=self.np_random, record_history=self.config["show_trajectories"])
# road.objects.append(Obstacle(road, lbc.position(ends[2], 0)))
# self.road = road
# def _make_vehicles(self) -> None:
# """
# Populate a road with several vehicles on the highway and on the merging lane, as well as an ego-vehicle.
# :return: the ego-vehicle
# """
# road = self.road
# ego_vehicle = self.action_type.vehicle_class(road,
# road.network.get_lane(("a", "b", 1)).position(30, 0),
# speed=30)
# road.vehicles.append(ego_vehicle)
# other_vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])
# if self.config["random_spawn"] is True:
# road.vehicles.append(other_vehicles_type(road, road.network.get_lane(("a", "b", self.np_random.choice([0,1]))).position(self.np_random.integers(low=75, high=95), 0), speed=29))
# road.vehicles.append(other_vehicles_type(road, road.network.get_lane(("a", "b", self.np_random.choice([0,1]))).position(self.np_random.integers(low=50, high=70), 0), speed=31))
# road.vehicles.append(other_vehicles_type(road, road.network.get_lane(("a", "b", 0)).position(self.np_random.integers(low=25, high=45), 0), speed=31.5))
# else:
# road.vehicles.append(other_vehicles_type(road, road.network.get_lane(("a", "b", 0)).position(90, 0), speed=29))
# road.vehicles.append(other_vehicles_type(road, road.network.get_lane(("a", "b", 1)).position(70, 0), speed=31))
# road.vehicles.append(other_vehicles_type(road, road.network.get_lane(("a", "b", 0)).position(5, 0), speed=31.5))
# merging_v = other_vehicles_type(road, road.network.get_lane(("j", "k", 0)).position(110, 0), speed=20)
# merging_v.target_speed = 30
# road.vehicles.append(merging_v)
# self.vehicle = ego_vehicle
Appendix C: Advantage Actor-Critic (A2C)¶
An implementation of a synchronous version of the A3C algorithm. This implementation was discarded as it yielded worse results than its asynchronous counterpart.
class A2CAgent():
def __init__(self,
input_shape,
num_actions,
actor_learning_rate=3e-4,
critic_learning_rate=3e-4,
use_attention=False,
encoding_dim=64,
num_heads=2):
self.input_shape = input_shape
self.num_actions = num_actions
self.actor_optimizer = Adam(learning_rate=actor_learning_rate)
self.critic_optimizer = Adam(learning_rate=critic_learning_rate)
self.use_attention = use_attention
if self.use_attention is False:
input_layer, action_output, value_output = self.build_mlp()
self.actor = Model(inputs=input_layer, outputs=action_output)
self.critic = Model(inputs=input_layer, outputs=value_output)
else:
self.encoding_dim = encoding_dim
self.num_heads = num_heads
input_layer, encoded_features, decoder_output = self.build_ego_attention_network()
self.actor = self.build_attention_actor(input_layer, encoded_features)
self.critic = self.build_attention_critic(input_layer, decoder_output)
def build_mlp(self):
input_layer = Input(shape=self.input_shape)
x = Dense(64, activation="tanh")(input_layer)
x = Dense(64, activation="tanh")(x)
action_output = Dense(self.num_actions)(x)
value_output = Dense(1, activation="linear")(x)
return input_layer, action_output, value_output
def build_ego_attention_network(self):
input_layer = Input(shape=self.input_shape, name='input_layer')
encoded_features = []
for i in range(self.input_shape[0]):
feature_vector = input_layer[:, i, :]
if i == 0:
# Ego encoder: Output L_q, L_k, L_v
L_q = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_q')(feature_vector)
L_k = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_k')(feature_vector)
L_v = Dense(self.encoding_dim, activation='linear', name=f'ego_encoding_L_v')(feature_vector)
ego_encoded = [L_q, L_k, L_v]
else:
# Regular encoder: Output L_k and L_v
L_k = Dense(self.encoding_dim, activation='linear', name=f'encoding_L_k_{i}')(feature_vector)
L_v = Dense(self.encoding_dim, activation='linear', name=f'encoding_L_v_{i}')(feature_vector)
encoded_features.extend([L_k, L_v])
# Ego attention layer
ego_attention = EgoAttentionLayer(num_heads=self.num_heads, encoding_dim=self.encoding_dim, name='ego_attention_layer')
ego_attention_output = ego_attention(ego_encoded[0], ego_encoded[1], ego_encoded[2])
# Decoder layer
decoder_output = Dense(1, activation='linear', name='value_estimate')(ego_attention_output)
return input_layer, encoded_features, decoder_output
def build_attention_actor(self, input_layer, encoded_features):
x = Concatenate()(encoded_features)
x = Flatten()(x)
x = Dense(64, activation="tanh")(x)
x = Dense(64, activation="tanh")(x)
outputs = Dense(self.num_actions)(x)
return Model(inputs=input_layer, outputs=outputs)
def build_attention_critic(self, input_layer, decoder_output):
return Model(inputs=input_layer, outputs=decoder_output)
def compute_entropy(self, logits):
# Convert logits to logprobabilities
logprobabilities = tf.keras.ops.log_softmax(logits + 1e-10)
return -tf.reduce_sum(logits * logprobabilities, axis=1)
def sample_action(self, observation):
logits = self.actor(observation)
# print(logits)
action = tf.squeeze(
tf.keras.random.categorical(logits, 1), axis=1
)
return logits, action
def logprobabilities(self, logits, action):
logprobabilities_all = tf.keras.ops.log_softmax(logits)
logprobability = tf.keras.ops.sum(
tf.keras.ops.one_hot(action, self.num_actions) * logprobabilities_all, axis=1
)
return logprobability
def train_critic(self, observation_buffer, returns):
with tf.GradientTape() as tape:
loss = tf.keras.losses.Huber()(self.critic(observation_buffer), returns)
grads = tape.gradient(loss, self.critic.trainable_variables)
self.critic_optimizer.apply_gradients(zip(grads, self.critic.trainable_variables))
return loss
def evaluate_actor(self, env, num_actors, max_steps):
returns, lengths, speeds = [], [], []
for a in range(num_actors):
observation, _ = env.reset()
episode_return, episode_length = 0, 0
episode_speeds = []
for t in range(max_steps):
env.render()
if self.use_attention:
reshaped_observation = tf.expand_dims(observation, axis=0)
else:
reshaped_observation = tf.expand_dims(observation.flatten(), axis=0)
logits = self.actor(reshaped_observation)
logprobs = tf.keras.ops.softmax(logits)
action = np.argmax(logprobs)
observation, reward, done, _, info = env.step(action)
episode_length += 1
episode_return += reward
episode_speeds.append(info["speed"])
if done or (t == max_steps - 1):
returns.append(episode_return)
lengths.append(episode_length)
speeds.append(np.mean(episode_speeds))
env.close()
return np.mean(returns), np.mean(lengths), np.mean(speeds)
def train(self, env, num_episodes, num_actors, max_steps=128, eval_frequency=5, entropy_coefficient=0.01):
running_reward = 0
for episode in range(num_episodes):
observation_buffer = []
# action_buffer = []
value_buffer = []
return_buffer = []
logprobability_buffer = []
entropy_buffer = []
with tf.GradientTape(persistent=True) as tape:
for a in range(num_actors):
observation, _ = env.reset()
for t in range(max_steps):
# env.render(); Adding this line would show the attempts
if self.use_attention:
reshaped_observation = tf.expand_dims(observation, axis=0)
else:
reshaped_observation = tf.expand_dims(observation.flatten(), axis=0)
logits, action = self.sample_action(reshaped_observation)
action = action[0].numpy()
observation_new, reward, done, _, info = env.step(action)
value_t = tf.keras.ops.squeeze(self.critic(reshaped_observation)).numpy()
logprobability_t = self.logprobabilities(logits, action)
entropy_t = self.compute_entropy(logits)
value_buffer.append(value_t)
entropy_buffer.append(entropy_t)
logprobability_buffer.append(logprobability_t)
return_buffer.append(reward)
# action_buffer.append(action)
observation_buffer.append(reshaped_observation)
# update observation
observation = observation_new
if done:
break
returns = []
discounted_sum = 0
for r in return_buffer[::-1]:
discounted_sum = r + gamma * discounted_sum
returns.insert(0, discounted_sum)
# train actor and critic
observation_buffer = tf.cast(tf.squeeze(tf.convert_to_tensor(observation_buffer)), dtype=tf.float32)
logprobability_buffer = tf.cast(tf.convert_to_tensor(logprobability_buffer), dtype=tf.float32)
returns = tf.cast(tf.convert_to_tensor(returns), dtype=tf.float32)
entropy_buffer = tf.cast(tf.convert_to_tensor(entropy_buffer), dtype=tf.float32)
value_buffer = tf.cast(tf.convert_to_tensor(value_buffer), dtype=tf.float32)
actor_loss = tf.reduce_mean(
tf.squeeze(-logprobability_buffer * (returns - value_buffer)
- tf.scalar_mul(entropy_coefficient, entropy_buffer))
)
# critic_loss = tf.keras.losses.Huber()(value_buffer, returns)
actor_grads = tape.gradient(actor_loss, self.actor.trainable_variables)
self.actor_optimizer.apply_gradients(zip(actor_grads, self.actor.trainable_variables))
critic_loss = self.train_critic(observation_buffer, returns)
print(f"Actor loss: {actor_loss}. Critic loss: {critic_loss}")
if episode % eval_frequency == 0:
mean_returns, mean_lengths, mean_speeds = self.evaluate_actor(env, num_actors, max_steps)
print(f"Epoch: {episode + 1}. Mean Return: {mean_returns}. Mean Length: {mean_lengths}. Mean speed: {mean_speeds}.")